7 "github.com/bytom/vapor/application/mov/common"
8 "github.com/bytom/vapor/application/mov/database"
9 "github.com/bytom/vapor/application/mov/match"
10 "github.com/bytom/vapor/protocol/bc/types"
13 type matchCollector struct {
15 tradePairIterator *database.TradePairIterator
21 processCh chan *matchTxResult
22 tradePairCh chan *common.TradePair
26 type matchTxResult struct {
31 func newMatchTxCollector(engine *match.Engine, iterator *database.TradePairIterator, gasLeft int64, isTimeout func() bool) *matchCollector {
32 workerNum := runtime.NumCPU()
33 return &matchCollector{
35 tradePairIterator: iterator,
37 workerNumCh: make(chan int, workerNum),
38 processCh: make(chan *matchTxResult, 32),
39 tradePairCh: make(chan *common.TradePair, workerNum),
40 closeCh: make(chan struct{}),
46 func (m *matchCollector) result() ([]*types.Tx, error) {
48 for i := 0; i < int(m.workerNum); i++ {
50 go m.matchTxWorker(&wg)
54 go m.tradePairProducer(&wg)
56 matchedTxs, err := m.collect()
57 // wait for all goroutine release
59 return matchedTxs, err
62 func (m *matchCollector) collect() ([]*types.Tx, error) {
63 defer close(m.closeCh)
65 var matchedTxs []*types.Tx
69 case data := <-m.processCh:
74 gasUsed := calcMatchedTxGasUsed(data.matchedTx)
75 if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
76 matchedTxs = append(matchedTxs, data.matchedTx)
78 return matchedTxs, nil
81 if completed++; completed == m.workerNum {
82 return matchedTxs, nil
86 return matchedTxs, nil
89 func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
95 tradePairMap := make(map[string]bool)
97 for m.tradePairIterator.HasNext() {
98 tradePair := m.tradePairIterator.Next()
99 if tradePairMap[tradePair.Key()] {
103 tradePairMap[tradePair.Key()] = true
104 tradePairMap[tradePair.Reverse().Key()] = true
109 case m.tradePairCh <- tradePair:
114 func (m *matchCollector) matchTxWorker(wg *sync.WaitGroup) {
115 dispatchData := func(data *matchTxResult) bool {
119 case m.processCh <- data:
135 case tradePair := <-m.tradePairCh:
136 if tradePair == nil {
139 for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
140 matchedTx, err := m.engine.NextMatchedTx(tradePair, tradePair.Reverse())
141 if done := dispatchData(&matchTxResult{matchedTx: matchedTx, err: err}); done {