OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / application / mov / match_collector.go
1 package mov
2
3 import (
4         "runtime"
5         "sync"
6
7         "github.com/bytom/vapor/application/mov/common"
8         "github.com/bytom/vapor/application/mov/database"
9         "github.com/bytom/vapor/application/mov/match"
10         "github.com/bytom/vapor/protocol/bc/types"
11 )
12
13 type matchCollector struct {
14         engine            *match.Engine
15         tradePairIterator *database.TradePairIterator
16         gasLeft           int64
17         isTimeout         func() bool
18
19         workerNum     int
20         endWorkCh     chan int
21         tradePairCh   chan *common.TradePair
22         matchResultCh chan *matchTxResult
23         closeCh       chan struct{}
24 }
25
26 type matchTxResult struct {
27         matchedTx *types.Tx
28         err       error
29 }
30
31 func newMatchTxCollector(engine *match.Engine, iterator *database.TradePairIterator, gasLeft int64, isTimeout func() bool) *matchCollector {
32         workerNum := runtime.NumCPU()
33         return &matchCollector{
34                 engine:            engine,
35                 tradePairIterator: iterator,
36                 gasLeft:           gasLeft,
37                 isTimeout:         isTimeout,
38                 workerNum:         workerNum,
39                 endWorkCh:         make(chan int, workerNum),
40                 tradePairCh:       make(chan *common.TradePair, workerNum),
41                 matchResultCh:     make(chan *matchTxResult),
42                 closeCh:           make(chan struct{}),
43         }
44 }
45
46 func (m *matchCollector) result() ([]*types.Tx, error) {
47         var wg sync.WaitGroup
48         for i := 0; i < int(m.workerNum); i++ {
49                 wg.Add(1)
50                 go m.matchTxWorker(&wg)
51         }
52
53         wg.Add(1)
54         go m.tradePairProducer(&wg)
55
56         matchedTxs, err := m.collect()
57         // wait for all goroutine release
58         wg.Wait()
59         return matchedTxs, err
60 }
61
62 func (m *matchCollector) collect() ([]*types.Tx, error) {
63         defer close(m.closeCh)
64
65         var matchedTxs []*types.Tx
66         for completed := 0; !m.isTimeout(); {
67                 select {
68                 case data := <-m.matchResultCh:
69                         if data.err != nil {
70                                 return nil, data.err
71                         }
72
73                         gasUsed := calcMatchedTxGasUsed(data.matchedTx)
74                         if m.gasLeft -= gasUsed; m.gasLeft >= 0 {
75                                 matchedTxs = append(matchedTxs, data.matchedTx)
76                         } else {
77                                 return matchedTxs, nil
78                         }
79                 case <-m.endWorkCh:
80                         if completed++; completed == m.workerNum {
81                                 return matchedTxs, nil
82                         }
83                 }
84         }
85         return matchedTxs, nil
86 }
87
88 func (m *matchCollector) tradePairProducer(wg *sync.WaitGroup) {
89         defer func() {
90                 close(m.tradePairCh)
91                 wg.Done()
92         }()
93
94         tradePairMap := make(map[string]bool)
95
96         for m.tradePairIterator.HasNext() {
97                 tradePair := m.tradePairIterator.Next()
98                 if tradePairMap[tradePair.Key()] {
99                         continue
100                 }
101
102                 tradePairMap[tradePair.Key()] = true
103                 tradePairMap[tradePair.Reverse().Key()] = true
104
105                 select {
106                 case <-m.closeCh:
107                         return
108                 case m.tradePairCh <- tradePair:
109                 }
110         }
111 }
112
113 func (m *matchCollector) matchTxWorker(wg *sync.WaitGroup) {
114         defer func() {
115                 m.endWorkCh <- 1
116                 wg.Done()
117         }()
118
119         for {
120                 select {
121                 case <-m.closeCh:
122                         return
123                 case tradePair := <-m.tradePairCh:
124                         // end worker due to all trade pair has been matched
125                         if tradePair == nil {
126                                 return
127                         }
128
129                         for m.engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
130                                 matchedTx, err := m.engine.NextMatchedTx(tradePair, tradePair.Reverse())
131                                 select {
132                                 case <-m.closeCh:
133                                         return
134                                 case m.matchResultCh <- &matchTxResult{matchedTx: matchedTx, err: err}:
135                                         if err != nil {
136                                                 return
137                                         }
138                                 }
139                         }
140                 }
141         }
142 }
143
144 func calcMatchedTxGasUsed(tx *types.Tx) int64 {
145         return int64(len(tx.Inputs))*150 + int64(tx.SerializedSize)
146 }