OSDN Git Service

mark tx before validation so it won't be sent to source again (#278)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/consensus"
9         dbm "github.com/vapor/database/leveldb"
10         "github.com/vapor/netsync/peers"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle = 5 * time.Second
18
19         noNeedSync = iota
20         fastSyncType
21         regularSyncType
22 )
23
24 var (
25         maxNumOfBlocksPerMsg  = uint64(1000)
26         maxNumOfHeadersPerMsg = uint64(1000)
27 )
28
29 type FastSync interface {
30         process() error
31         setSyncPeer(peer *peers.Peer)
32 }
33
34 type Fetcher interface {
35         processBlock(peerID string, block *types.Block)
36         processBlocks(peerID string, blocks []*types.Block)
37         processHeaders(peerID string, headers []*types.BlockHeader)
38         requireBlock(peerID string, height uint64) (*types.Block, error)
39 }
40
41 type blockMsg struct {
42         block  *types.Block
43         peerID string
44 }
45
46 type blocksMsg struct {
47         blocks []*types.Block
48         peerID string
49 }
50
51 type headersMsg struct {
52         headers []*types.BlockHeader
53         peerID  string
54 }
55
56 type blockKeeper struct {
57         chain      Chain
58         fastSync   FastSync
59         msgFetcher Fetcher
60         peers      *peers.PeerSet
61         syncPeer   *peers.Peer
62
63         quit chan struct{}
64 }
65
66 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
67         storage := newStorage(fastSyncDB)
68         msgFetcher := newMsgFetcher(storage, peers)
69         return &blockKeeper{
70                 chain:      chain,
71                 fastSync:   newFastSync(chain, msgFetcher, storage, peers),
72                 msgFetcher: msgFetcher,
73                 peers:      peers,
74                 quit:       make(chan struct{}),
75         }
76 }
77
78 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
79         headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
80         if err != nil {
81                 return nil, err
82         }
83
84         blocks := []*types.Block{}
85         for _, header := range headers {
86                 headerHash := header.Hash()
87                 block, err := bk.chain.GetBlockByHash(&headerHash)
88                 if err != nil {
89                         return nil, err
90                 }
91
92                 blocks = append(blocks, block)
93         }
94         return blocks, nil
95 }
96
97 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
98         startHeader, err := bk.chain.GetHeaderByHeight(0)
99         if err != nil {
100                 return nil, err
101         }
102
103         for _, hash := range locator {
104                 header, err := bk.chain.GetHeaderByHash(hash)
105                 if err == nil && bk.chain.InMainChain(header.Hash()) {
106                         startHeader = header
107                         break
108                 }
109         }
110
111         headers := make([]*types.BlockHeader, 0)
112         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
113         if err != nil {
114                 return headers, nil
115         }
116
117         if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
118                 return headers, nil
119         }
120
121         headers = append(headers, startHeader)
122         if stopHeader.Height == startHeader.Height {
123                 return headers, nil
124         }
125
126         for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
127                 index += skip + 1
128                 if index >= stopHeader.Height {
129                         headers = append(headers, stopHeader)
130                         break
131                 }
132
133                 header, err := bk.chain.GetHeaderByHeight(index)
134                 if err != nil {
135                         return nil, err
136                 }
137
138                 headers = append(headers, header)
139         }
140
141         return headers, nil
142 }
143
144 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
145         bk.msgFetcher.processBlock(peerID, block)
146 }
147
148 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
149         bk.msgFetcher.processBlocks(peerID, blocks)
150 }
151
152 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
153         bk.msgFetcher.processHeaders(peerID, headers)
154 }
155
156 func (bk *blockKeeper) regularBlockSync() error {
157         peerHeight := bk.syncPeer.Height()
158         bestHeight := bk.chain.BestBlockHeight()
159         i := bestHeight + 1
160         for i <= peerHeight {
161                 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
162                 if err != nil {
163                         bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
164                         return err
165                 }
166
167                 isOrphan, err := bk.chain.ProcessBlock(block)
168                 if err != nil {
169                         bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
170                         return err
171                 }
172
173                 if isOrphan {
174                         i--
175                         continue
176                 }
177                 i = bk.chain.BestBlockHeight() + 1
178         }
179         log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
180         return nil
181 }
182
183 func (bk *blockKeeper) start() {
184         go bk.syncWorker()
185 }
186
187 func (bk *blockKeeper) checkSyncType() int {
188         peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
189         if peer == nil {
190                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
191                 return noNeedSync
192         }
193
194         bestHeight := bk.chain.BestBlockHeight()
195         if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
196                 bk.fastSync.setSyncPeer(peer)
197                 return fastSyncType
198         }
199
200         peer = bk.peers.BestPeer(consensus.SFFullNode)
201         if peer == nil {
202                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
203                 return noNeedSync
204         }
205
206         peerHeight := peer.Height()
207         if peerHeight > bestHeight {
208                 bk.syncPeer = peer
209                 return regularSyncType
210         }
211
212         return noNeedSync
213 }
214
215 func (bk *blockKeeper) startSync() bool {
216         switch bk.checkSyncType() {
217         case fastSyncType:
218                 if err := bk.fastSync.process(); err != nil {
219                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
220                         return false
221                 }
222         case regularSyncType:
223                 if err := bk.regularBlockSync(); err != nil {
224                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
225                         return false
226                 }
227         }
228
229         return true
230 }
231
232 func (bk *blockKeeper) stop() {
233         close(bk.quit)
234 }
235
236 func (bk *blockKeeper) syncWorker() {
237         syncTicker := time.NewTicker(syncCycle)
238         defer syncTicker.Stop()
239
240         for {
241                 select {
242                 case <-syncTicker.C:
243                         if update := bk.startSync(); !update {
244                                 continue
245                         }
246
247                         if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
248                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
249                         }
250                 case <-bk.quit:
251                         return
252                 }
253         }
254 }