--- /dev/null
+package mov
+
+import (
+ "runtime"
+ "sync"
+
+ "github.com/bytom/vapor/application/mov/common"
+ "github.com/bytom/vapor/application/mov/database"
+ "github.com/bytom/vapor/application/mov/match"
+ "github.com/bytom/vapor/protocol/bc/types"
+)
+
+type matchCollector struct {
+ engine *match.Engine
+ tradePairIterator *database.TradePairIterator
+ gasLeft int64
+ isTimeout func() bool
+
+ workerNum int
+ endWorkCh chan int
+ tradePairCh chan *common.TradePair
+ matchResultCh chan *matchTxResult
+ closeCh chan struct{}
+}
+
+type matchTxResult struct {
+ matchedTx *types.Tx
+ err error
+}
+
+func newMatchTxCollector(engine *match.Engine, iterator *database.TradePairIterator, gasLeft int64, isTimeout func() bool) *matchCollector {
+ workerNum := runtime.NumCPU()
+ return &matchCollector{
+ engine: engine,
+ tradePairIterator: iterator,
+ gasLeft: gasLeft,
+ isTimeout: isTimeout,
+ workerNum: workerNum,
+ endWorkCh: make(chan int, workerNum),
+ tradePairCh: make(chan *common.TradePair, workerNum),
+ matchResultCh: make(chan *matchTxResult),
+ closeCh: make(chan struct{}),
+ }
+}
+
+func (m *matchCollector) result() ([]*types.Tx, error) {
+ var wg sync.WaitGroup
+ for i := 0; i < int(m.workerNum); i++ {
+ wg.Add(1)
+ go m.matchTxWorker(&wg)
+ }
+
+ wg.Add(1)
+ go m.tradePairProducer(&wg)
+
+ matchedTxs, err := m.collect()
+ // wait for all goroutine release
+ wg.Wait()
+ return matchedTxs, err
+}
+
+func (m *matchCollector) collect() ([]*types.Tx, error) {
+ defer close(m.closeCh)
+
+ var matchedTxs []*types.Tx
+ for completed := 0; !m.isTimeout(); {
+ select {
+ case data := <-m.matchResultCh:
+ if data.err != nil {
+ return nil, data.err
+ }
+
+ gasUsed := calcMatchedTxGasUsed(data.matchedTx)
+ if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
+ matchedTxs = append(matchedTxs, data.matchedTx)
+ } else {
+ return matchedTxs, nil
+ }
+ case <-m.endWorkCh:
+ if completed++; completed == m.workerNum {
+ return matchedTxs, nil
+ }
+ }
+ }
+ return matchedTxs, nil
+}
+
+func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
+ defer func() {
+ close(m.tradePairCh)
+ wg.Done()
+ }()
+
+ tradePairMap := make(map[string]bool)
+
+ for m.tradePairIterator.HasNext() {
+ tradePair := m.tradePairIterator.Next()
+ if tradePairMap[tradePair.Key()] {
+ continue
+ }
+
+ tradePairMap[tradePair.Key()] = true
+ tradePairMap[tradePair.Reverse().Key()] = true
+
+ select {
+ case <-m.closeCh:
+ return
+ case m.tradePairCh <- tradePair:
+ }
+ }
+}
+
+func (m *matchCollector) matchTxWorker(wg *sync.WaitGroup) {
+ defer func() {
+ m.endWorkCh <- 1
+ wg.Done()
+ }()
+
+ for {
+ select {
+ case <-m.closeCh:
+ return
+ case tradePair := <-m.tradePairCh:
+ // end worker due to all trade pair has been matched
+ if tradePair == nil {
+ return
+ }
+
+ for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
+ matchedTx, err := m.engine.NextMatchedTx(tradePair, tradePair.Reverse())
+ select {
+ case <-m.closeCh:
+ return
+ case m.matchResultCh <- &matchTxResult{matchedTx: matchedTx, err: err}:
+ if err != nil {
+ return
+ }
+ }
+ }
+ }
+ }
+}
+
+func calcMatchedTxGasUsed(tx *types.Tx) int64 {
+ return int64(len(tx.Inputs))*150 + int64(tx.SerializedSize)
+}