OSDN Git Service

netsync add test case (#365)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/consensus"
9         dbm "github.com/vapor/database/leveldb"
10         "github.com/vapor/netsync/peers"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle = 5 * time.Second
18
19         noNeedSync = iota
20         fastSyncType
21         regularSyncType
22 )
23
24 var (
25         maxNumOfBlocksPerMsg      = uint64(1000)
26         maxNumOfHeadersPerMsg     = uint64(1000)
27         maxNumOfBlocksRegularSync = uint64(128)
28 )
29
30 type FastSync interface {
31         process() error
32         setSyncPeer(peer *peers.Peer)
33 }
34
35 type Fetcher interface {
36         processBlock(peerID string, block *types.Block)
37         processBlocks(peerID string, blocks []*types.Block)
38         processHeaders(peerID string, headers []*types.BlockHeader)
39         requireBlock(peerID string, height uint64) (*types.Block, error)
40 }
41
42 type blockMsg struct {
43         block  *types.Block
44         peerID string
45 }
46
47 type blocksMsg struct {
48         blocks []*types.Block
49         peerID string
50 }
51
52 type headersMsg struct {
53         headers []*types.BlockHeader
54         peerID  string
55 }
56
57 type blockKeeper struct {
58         chain      Chain
59         fastSync   FastSync
60         msgFetcher Fetcher
61         peers      *peers.PeerSet
62         syncPeer   *peers.Peer
63
64         quit chan struct{}
65 }
66
67 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
68         storage := newStorage(fastSyncDB)
69         msgFetcher := newMsgFetcher(storage, peers)
70         return &blockKeeper{
71                 chain:      chain,
72                 fastSync:   newFastSync(chain, msgFetcher, storage, peers),
73                 msgFetcher: msgFetcher,
74                 peers:      peers,
75                 quit:       make(chan struct{}),
76         }
77 }
78
79 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
80         headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
81         if err != nil {
82                 return nil, err
83         }
84
85         blocks := []*types.Block{}
86         for _, header := range headers {
87                 headerHash := header.Hash()
88                 block, err := bk.chain.GetBlockByHash(&headerHash)
89                 if err != nil {
90                         return nil, err
91                 }
92
93                 blocks = append(blocks, block)
94         }
95         return blocks, nil
96 }
97
98 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
99         startHeader, err := bk.chain.GetHeaderByHeight(0)
100         if err != nil {
101                 return nil, err
102         }
103
104         for _, hash := range locator {
105                 header, err := bk.chain.GetHeaderByHash(hash)
106                 if err == nil && bk.chain.InMainChain(header.Hash()) {
107                         startHeader = header
108                         break
109                 }
110         }
111
112         headers := make([]*types.BlockHeader, 0)
113         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
114         if err != nil {
115                 return headers, err
116         }
117
118         if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
119                 return headers, nil
120         }
121
122         headers = append(headers, startHeader)
123         if stopHeader.Height == startHeader.Height {
124                 return headers, nil
125         }
126
127         for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
128                 index += skip + 1
129                 if index >= stopHeader.Height {
130                         headers = append(headers, stopHeader)
131                         break
132                 }
133
134                 header, err := bk.chain.GetHeaderByHeight(index)
135                 if err != nil {
136                         return nil, err
137                 }
138
139                 headers = append(headers, header)
140         }
141
142         return headers, nil
143 }
144
145 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
146         bk.msgFetcher.processBlock(peerID, block)
147 }
148
149 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
150         bk.msgFetcher.processBlocks(peerID, blocks)
151 }
152
153 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
154         bk.msgFetcher.processHeaders(peerID, headers)
155 }
156
157 func (bk *blockKeeper) regularBlockSync() error {
158         peerHeight := bk.syncPeer.Height()
159         bestHeight := bk.chain.BestBlockHeight()
160         targetHeight := bestHeight + maxNumOfBlocksRegularSync
161         if targetHeight > peerHeight {
162                 targetHeight = peerHeight
163         }
164
165         i := bestHeight + 1
166         for i <= targetHeight {
167                 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
168                 if err != nil {
169                         bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
170                         return err
171                 }
172
173                 isOrphan, err := bk.chain.ProcessBlock(block)
174                 if err != nil {
175                         bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
176                         return err
177                 }
178
179                 if isOrphan {
180                         i--
181                         continue
182                 }
183                 i = bk.chain.BestBlockHeight() + 1
184         }
185         log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
186         return nil
187 }
188
189 func (bk *blockKeeper) start() {
190         go bk.syncWorker()
191 }
192
193 func (bk *blockKeeper) checkSyncType() int {
194         bestHeight := bk.chain.BestBlockHeight()
195         peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
196         if peer != nil {
197                 if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
198                         bk.fastSync.setSyncPeer(peer)
199                         return fastSyncType
200                 }
201         }
202
203         peer = bk.peers.BestPeer(consensus.SFFullNode)
204         if peer == nil {
205                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
206                 return noNeedSync
207         }
208
209         if peer.Height() > bestHeight {
210                 bk.syncPeer = peer
211                 return regularSyncType
212         }
213
214         return noNeedSync
215 }
216
217 func (bk *blockKeeper) startSync() bool {
218         switch bk.checkSyncType() {
219         case fastSyncType:
220                 if err := bk.fastSync.process(); err != nil {
221                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
222                         return false
223                 }
224         case regularSyncType:
225                 if err := bk.regularBlockSync(); err != nil {
226                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
227                         return false
228                 }
229         default:
230                 return false
231         }
232
233         return true
234 }
235
236 func (bk *blockKeeper) stop() {
237         close(bk.quit)
238 }
239
240 func (bk *blockKeeper) syncWorker() {
241         syncTicker := time.NewTicker(syncCycle)
242         defer syncTicker.Stop()
243
244         for {
245                 select {
246                 case <-syncTicker.C:
247                         if update := bk.startSync(); !update {
248                                 continue
249                         }
250
251                         if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
252                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
253                         }
254                 case <-bk.quit:
255                         return
256                 }
257         }
258 }