import (
"runtime"
"sync"
- "sync/atomic"
"github.com/bytom/vapor/application/mov/common"
"github.com/bytom/vapor/application/mov/database"
type matchCollector struct {
engine *match.Engine
tradePairIterator *database.TradePairIterator
- workerNum int32
- workerNumChan chan int32
- processCh chan *matchTxResult
- tradePairCh chan *common.TradePair
- closeCh chan struct{}
gasLeft int64
isTimeout func() bool
+
+ workerNum int32
+ workerNumCh chan int32
+ processCh chan *matchTxResult
+ tradePairCh chan *common.TradePair
+ closeCh chan struct{}
}
type matchTxResult struct {
engine: engine,
tradePairIterator: iterator,
workerNum: int32(workerNum),
- workerNumChan: make(chan int32, workerNum),
+ workerNumCh: make(chan int32, workerNum),
processCh: make(chan *matchTxResult, 32),
tradePairCh: make(chan *common.TradePair, workerNum),
closeCh: make(chan struct{}),
defer close(m.closeCh)
var matchedTxs []*types.Tx
- for {
- if m.isTimeout() {
- return matchedTxs, nil
- }
-
+ var completed int32
+ for !m.isTimeout() {
select {
case data := <-m.processCh:
if data.err != nil {
return nil, data.err
}
- if data.matchedTx != nil {
- gasUsed := calcMatchedTxGasUsed(data.matchedTx)
- if m.gasLeft-gasUsed >= 0 {
- matchedTxs = append(matchedTxs, data.matchedTx)
- m.gasLeft -= gasUsed
- } else {
- return matchedTxs, nil
- }
+ gasUsed := calcMatchedTxGasUsed(data.matchedTx)
+ if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
+ matchedTxs = append(matchedTxs, data.matchedTx)
+ } else {
+ return matchedTxs, nil
}
- case remainingWorker := <-m.workerNumChan:
- if remainingWorker == 0 {
+ case <-m.workerNumCh:
+ if completed++; completed == m.workerNum {
return matchedTxs, nil
}
}
}
+ return matchedTxs, nil
}
func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
}
}
- defer wg.Done()
+ defer func() {
+ m.workerNumCh <- 1
+ wg.Done()
+ }()
for {
select {
case <-m.closeCh:
return
case tradePair := <-m.tradePairCh:
if tradePair == nil {
- atomic.AddInt32(&m.workerNum, -1)
- m.workerNumChan <- m.workerNum
return
}
for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {