7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/consensus"
10 "github.com/vapor/errors"
11 "github.com/vapor/netsync/peers"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
18 blockProcessChSize = 1024
19 blocksProcessChSize = 128
20 headersProcessChSize = 1024
24 maxBlockPerMsg = uint64(128)
25 maxBlockHeadersPerMsg = uint64(2048)
26 syncTimeout = 30 * time.Second
28 errAppendHeaders = errors.New("fail to append list due to order dismatch")
29 errRequestTimeout = errors.New("request timeout")
30 errPeerDropped = errors.New("Peer dropped")
33 type blockMsg struct {
38 type blocksMsg struct {
43 type headersMsg struct {
44 headers []*types.BlockHeader
48 type blockKeeper struct {
53 blockProcessCh chan *blockMsg
54 blocksProcessCh chan *blocksMsg
55 headersProcessCh chan *headersMsg
60 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
64 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
65 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
66 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
67 headerList: list.New(),
74 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
75 for _, header := range headers {
76 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
77 if prevHeader.Hash() != header.PreviousBlockHash {
78 return errAppendHeaders
80 bk.headerList.PushBack(header)
85 func (bk *blockKeeper) blockLocator() []*bc.Hash {
86 header := bk.chain.BestBlockHeader()
87 locator := []*bc.Hash{}
91 headerHash := header.Hash()
92 locator = append(locator, &headerHash)
93 if header.Height == 0 {
98 if header.Height < step {
99 header, err = bk.chain.GetHeaderByHeight(0)
101 header, err = bk.chain.GetHeaderByHeight(header.Height - step)
104 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
108 if len(locator) >= 9 {
115 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
116 bk.resetHeaderState()
117 lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
118 for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
119 if lastHeader.Height >= checkPoint.Height {
120 return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
123 lastHash := lastHeader.Hash()
124 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
129 if len(headers) == 0 {
130 return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
133 if err := bk.appendHeaderList(headers); err != nil {
138 fastHeader := bk.headerList.Front()
139 for bk.chain.BestBlockHeight() < checkPoint.Height {
140 locator := bk.blockLocator()
141 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
146 if len(blocks) == 0 {
147 return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
150 for _, block := range blocks {
151 if fastHeader = fastHeader.Next(); fastHeader == nil {
152 return errors.New("get block than is higher than checkpoint")
155 if _, err = bk.chain.ProcessBlock(block); err != nil {
156 return errors.Wrap(err, "fail on fastBlockSync process block")
163 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
164 headers, err := bk.locateHeaders(locator, stopHash)
169 blocks := []*types.Block{}
170 for i, header := range headers {
171 if uint64(i) >= maxBlockPerMsg {
175 headerHash := header.Hash()
176 block, err := bk.chain.GetBlockByHash(&headerHash)
181 blocks = append(blocks, block)
186 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
187 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
192 startHeader, err := bk.chain.GetHeaderByHeight(0)
197 for _, hash := range locator {
198 header, err := bk.chain.GetHeaderByHash(hash)
199 if err == nil && bk.chain.InMainChain(header.Hash()) {
205 totalHeaders := stopHeader.Height - startHeader.Height
206 if totalHeaders > maxBlockHeadersPerMsg {
207 totalHeaders = maxBlockHeadersPerMsg
210 headers := []*types.BlockHeader{}
211 for i := uint64(1); i <= totalHeaders; i++ {
212 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
217 headers = append(headers, header)
222 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
223 height := bk.chain.BestBlockHeader().Height
224 checkpoints := consensus.ActiveNetParams.Checkpoints
225 if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
229 nextCheckpoint := &checkpoints[len(checkpoints)-1]
230 for i := len(checkpoints) - 2; i >= 0; i-- {
231 if height >= checkpoints[i].Height {
234 nextCheckpoint = &checkpoints[i]
236 return nextCheckpoint
239 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
240 bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
243 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
244 bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
247 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
248 bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
251 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
252 i := bk.chain.BestBlockHeight() + 1
253 for i <= wantHeight {
254 block, err := bk.requireBlock(i)
259 isOrphan, err := bk.chain.ProcessBlock(block)
268 i = bk.chain.BestBlockHeight() + 1
273 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
275 msg := struct{ BlockchainMessage }{NewGetBlockMessage(height, [32]byte{})}
276 if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
277 return nil, errPeerDropped
280 timeout := time.NewTimer(syncTimeout)
285 case msg := <-bk.blockProcessCh:
286 if msg.peerID != bk.syncPeerID {
289 if msg.block.Height != height {
292 return msg.block, nil
294 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
299 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
300 //send get blocks msg
301 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
302 if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
303 return nil, errPeerDropped
306 timeout := time.NewTimer(syncTimeout)
311 case msg := <-bk.blocksProcessCh:
312 if msg.peerID != bk.syncPeerID {
315 return msg.blocks, nil
317 return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
322 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
323 //send get headers msg
324 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
325 if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
326 return nil, errPeerDropped
329 timeout := time.NewTimer(syncTimeout)
334 case msg := <-bk.headersProcessCh:
335 if msg.peerID != bk.syncPeerID {
338 return msg.headers, nil
340 return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
345 // resetHeaderState sets the headers-first mode state to values appropriate for
346 // syncing from a new peer.
347 func (bk *blockKeeper) resetHeaderState() {
348 header := bk.chain.BestBlockHeader()
350 if bk.nextCheckpoint() != nil {
351 bk.headerList.PushBack(header)
355 func (bk *blockKeeper) startSync() bool {
356 checkPoint := bk.nextCheckpoint()
357 bestPeerID, bestPeerHeight := bk.peers.BestPeerInfo(consensus.SFFastSync | consensus.SFFullNode)
358 if bestPeerID != "" && checkPoint != nil && bestPeerHeight >= checkPoint.Height {
359 bk.syncPeerID = bestPeerID
360 if err := bk.fastBlockSync(checkPoint); err != nil {
361 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
362 bk.peers.ErrorHandler(bestPeerID, err)
368 blockHeight := bk.chain.BestBlockHeight()
369 bestPeerID, bestPeerHeight = bk.peers.BestPeerInfo(consensus.SFFullNode)
370 if bestPeerID != "" && bestPeerHeight > blockHeight {
371 bk.syncPeerID = bestPeerID
372 targetHeight := blockHeight + maxBlockPerMsg
373 if targetHeight > bestPeerHeight {
374 targetHeight = bestPeerHeight
377 if err := bk.regularBlockSync(targetHeight); err != nil {
378 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
379 bk.peers.ErrorHandler(bestPeerID, err)
387 func (bk *blockKeeper) syncWorker() {
388 syncTicker := time.NewTicker(syncCycle)
389 defer syncTicker.Stop()
393 if update := bk.startSync(); !update {
397 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
399 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
401 headMsg, _ := newStatusBroadcastMsg(&block.BlockHeader, BlockchainChannel)
403 if err = bk.peers.BroadcastMsg(headMsg); err != nil {
404 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")