OSDN Git Service

033194e3a0ee6608057552b71a8921501fff1021
[bytom/vapor.git] / application / mov / match_collector.go
1 package mov
2
3 import (
4         "runtime"
5         "sync"
6
7         "github.com/bytom/vapor/application/mov/common"
8         "github.com/bytom/vapor/application/mov/database"
9         "github.com/bytom/vapor/application/mov/match"
10         "github.com/bytom/vapor/protocol/bc/types"
11 )
12
13 type matchCollector struct {
14         engine            *match.Engine
15         tradePairIterator *database.TradePairIterator
16         gasLeft           int64
17         isTimeout         func() bool
18
19         workerNum   int
20         workerNumCh chan int
21         processCh   chan *matchTxResult
22         tradePairCh chan *common.TradePair
23         closeCh     chan struct{}
24 }
25
26 type matchTxResult struct {
27         matchedTx *types.Tx
28         err       error
29 }
30
31 func newMatchTxCollector(engine *match.Engine, iterator *database.TradePairIterator, gasLeft int64, isTimeout func() bool) *matchCollector {
32         workerNum := runtime.NumCPU()
33         return &matchCollector{
34                 engine:            engine,
35                 tradePairIterator: iterator,
36                 workerNum:         workerNum,
37                 workerNumCh:       make(chan int, workerNum),
38                 processCh:         make(chan *matchTxResult, 32),
39                 tradePairCh:       make(chan *common.TradePair, workerNum),
40                 closeCh:           make(chan struct{}),
41                 gasLeft:           gasLeft,
42                 isTimeout:         isTimeout,
43         }
44 }
45
46 func (m *matchCollector) result() ([]*types.Tx, error) {
47         var wg sync.WaitGroup
48         for i := 0; i < int(m.workerNum); i++ {
49                 wg.Add(1)
50                 go m.matchTxWorker(&wg)
51         }
52
53         wg.Add(1)
54         go m.tradePairProducer(&wg)
55
56         matchedTxs, err := m.collect()
57         // wait for all goroutine release
58         wg.Wait()
59         return matchedTxs, err
60 }
61
62 func (m *matchCollector) collect() ([]*types.Tx, error) {
63         defer close(m.closeCh)
64
65         var matchedTxs []*types.Tx
66         completed := 0
67         for !m.isTimeout() {
68                 select {
69                 case data := <-m.processCh:
70                         if data.err != nil {
71                                 return nil, data.err
72                         }
73
74                         gasUsed := calcMatchedTxGasUsed(data.matchedTx)
75                         if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
76                                 matchedTxs = append(matchedTxs, data.matchedTx)
77                         } else {
78                                 return matchedTxs, nil
79                         }
80                 case <-m.workerNumCh:
81                         if completed++; completed == m.workerNum {
82                                 return matchedTxs, nil
83                         }
84                 }
85         }
86         return matchedTxs, nil
87 }
88
89 func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
90         defer func() {
91                 close(m.tradePairCh)
92                 wg.Done()
93         }()
94
95         tradePairMap := make(map[string]bool)
96
97         for m.tradePairIterator.HasNext() {
98                 tradePair := m.tradePairIterator.Next()
99                 if tradePairMap[tradePair.Key()] {
100                         continue
101                 }
102
103                 tradePairMap[tradePair.Key()] = true
104                 tradePairMap[tradePair.Reverse().Key()] = true
105
106                 select {
107                 case <-m.closeCh:
108                         return
109                 case m.tradePairCh <- tradePair:
110                 }
111         }
112 }
113
114 func (m *matchCollector) matchTxWorker(wg *sync.WaitGroup) {
115         dispatchData := func(data *matchTxResult) bool {
116                 select {
117                 case <-m.closeCh:
118                         return true
119                 case m.processCh <- data:
120                         if data.err != nil {
121                                 return true
122                         }
123                         return false
124                 }
125         }
126
127         defer func() {
128                 m.workerNumCh <- 1
129                 wg.Done()
130         }()
131         for {
132                 select {
133                 case <-m.closeCh:
134                         return
135                 case tradePair := <-m.tradePairCh:
136                         if tradePair == nil {
137                                 return
138                         }
139                         for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
140                                 matchedTx, err := m.engine.NextMatchedTx(tradePair, tradePair.Reverse())
141                                 if done := dispatchData(&matchTxResult{matchedTx: matchedTx, err: err}); done {
142                                         return
143                                 }
144                         }
145                 }
146
147         }
148 }