6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/consensus"
9 "github.com/vapor/errors"
10 "github.com/vapor/netsync/peers"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
25 syncTimeout = 30 * time.Second
27 errRequestTimeout = errors.New("request timeout")
28 errPeerDropped = errors.New("Peer dropped")
31 type FastSync interface {
32 locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
33 locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error)
35 setSyncPeer(peer *peers.Peer)
38 type Fetcher interface {
39 processBlock(peerID string, block *types.Block)
40 processBlocks(peerID string, blocks []*types.Block)
41 processHeaders(peerID string, headers []*types.BlockHeader)
42 requireBlock(peerID string, height uint64) (*types.Block, error)
45 type blockMsg struct {
50 type blocksMsg struct {
55 type headersMsg struct {
56 headers []*types.BlockHeader
60 type blockKeeper struct {
70 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
71 msgFetcher := newMsgFetcher(peers)
74 fastSync: newFastSync(chain, msgFetcher, peers),
75 msgFetcher: msgFetcher,
77 quit: make(chan struct{}),
81 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
82 return bk.fastSync.locateBlocks(locator, stopHash)
85 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
86 return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum)
89 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
90 bk.msgFetcher.processBlock(peerID, block)
93 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
94 bk.msgFetcher.processBlocks(peerID, blocks)
97 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
98 bk.msgFetcher.processHeaders(peerID, headers)
101 func (bk *blockKeeper) regularBlockSync() error {
102 peerHeight := bk.syncPeer.Height()
103 bestHeight := bk.chain.BestBlockHeight()
105 for i <= peerHeight {
106 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
108 bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err)
112 isOrphan, err := bk.chain.ProcessBlock(block)
114 bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err)
122 i = bk.chain.BestBlockHeight() + 1
124 log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
128 func (bk *blockKeeper) start() {
132 func (bk *blockKeeper) checkSyncType() int {
133 peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
135 log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
139 bestHeight := bk.chain.BestBlockHeight()
141 if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
142 bk.fastSync.setSyncPeer(peer)
146 peer = bk.peers.BestPeer(consensus.SFFullNode)
148 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
152 peerHeight := peer.Height()
153 if peerHeight > bestHeight {
155 return regularSyncType
161 func (bk *blockKeeper) startSync() bool {
162 switch bk.checkSyncType() {
164 if err := bk.fastSync.process(); err != nil {
165 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
168 case regularSyncType:
169 if err := bk.regularBlockSync(); err != nil {
170 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
178 func (bk *blockKeeper) stop() {
182 func (bk *blockKeeper) syncWorker() {
183 syncTicker := time.NewTicker(syncCycle)
184 defer syncTicker.Stop()
189 if update := bk.startSync(); !update {
193 if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
194 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")