7 "github.com/bytom/vapor/application/mov/common"
8 "github.com/bytom/vapor/application/mov/database"
9 "github.com/bytom/vapor/application/mov/match"
10 "github.com/bytom/vapor/protocol/bc/types"
13 type matchCollector struct {
15 tradePairIterator *database.TradePairIterator
21 tradePairCh chan *common.TradePair
22 matchResultCh chan *matchTxResult
26 type matchTxResult struct {
31 func newMatchTxCollector(engine *match.Engine, iterator *database.TradePairIterator, gasLeft int64, isTimeout func() bool) *matchCollector {
32 workerNum := runtime.NumCPU()
33 return &matchCollector{
35 tradePairIterator: iterator,
39 endWorkCh: make(chan int, workerNum),
40 tradePairCh: make(chan *common.TradePair, workerNum),
41 matchResultCh: make(chan *matchTxResult),
42 closeCh: make(chan struct{}),
46 func (m *matchCollector) result() ([]*types.Tx, error) {
48 for i := 0; i < int(m.workerNum); i++ {
50 go m.matchTxWorker(&wg)
54 go m.tradePairProducer(&wg)
56 matchedTxs, err := m.collect()
57 // wait for all goroutine release
59 return matchedTxs, err
62 func (m *matchCollector) collect() ([]*types.Tx, error) {
63 defer close(m.closeCh)
65 var matchedTxs []*types.Tx
66 for completed := 0; !m.isTimeout(); {
68 case data := <-m.matchResultCh:
73 gasUsed := calcMatchedTxGasUsed(data.matchedTx)
74 if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
75 matchedTxs = append(matchedTxs, data.matchedTx)
77 return matchedTxs, nil
80 if completed++; completed == m.workerNum {
81 return matchedTxs, nil
85 return matchedTxs, nil
88 func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
94 tradePairMap := make(map[string]bool)
96 for m.tradePairIterator.HasNext() {
97 tradePair := m.tradePairIterator.Next()
98 if tradePairMap[tradePair.Key()] {
102 tradePairMap[tradePair.Key()] = true
103 tradePairMap[tradePair.Reverse().Key()] = true
108 case m.tradePairCh <- tradePair:
113 func (m *matchCollector) matchTxWorker(wg *sync.WaitGroup) {
123 case tradePair := <-m.tradePairCh:
124 // end worker due to all trade pair has been matched
125 if tradePair == nil {
129 for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
130 matchedTx, err := m.engine.NextMatchedTx(tradePair, tradePair.Reverse())
134 case m.matchResultCh <- &matchTxResult{matchedTx: matchedTx, err: err}:
144 func calcMatchedTxGasUsed(tx *types.Tx) int64 {
145 return int64(len(tx.Inputs))*150 + int64(tx.SerializedSize)