log "github.com/sirupsen/logrus"
"github.com/vapor/consensus"
- "github.com/vapor/errors"
+ dbm "github.com/vapor/database/leveldb"
"github.com/vapor/netsync/peers"
"github.com/vapor/p2p/security"
"github.com/vapor/protocol/bc"
)
var (
- syncTimeout = 30 * time.Second
-
- errRequestTimeout = errors.New("request timeout")
- errPeerDropped = errors.New("Peer dropped")
+ maxNumOfBlocksPerMsg = uint64(1000)
+ maxNumOfHeadersPerMsg = uint64(1000)
+ maxNumOfBlocksRegularSync = uint64(128)
)
type FastSync interface {
- locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
- locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error)
process() error
setSyncPeer(peer *peers.Peer)
}
quit chan struct{}
}
-func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
- msgFetcher := newMsgFetcher(peers)
+func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
+ storage := newStorage(fastSyncDB)
+ msgFetcher := newMsgFetcher(storage, peers)
return &blockKeeper{
chain: chain,
- fastSync: newFastSync(chain, msgFetcher, peers),
+ fastSync: newFastSync(chain, msgFetcher, storage, peers),
msgFetcher: msgFetcher,
peers: peers,
quit: make(chan struct{}),
}
func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
- return bk.fastSync.locateBlocks(locator, stopHash)
+ headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks := []*types.Block{}
+ for _, header := range headers {
+ headerHash := header.Hash()
+ block, err := bk.chain.GetBlockByHash(&headerHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks = append(blocks, block)
+ }
+ return blocks, nil
}
func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
- return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum)
+ startHeader, err := bk.chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, hash := range locator {
+ header, err := bk.chain.GetHeaderByHash(hash)
+ if err == nil && bk.chain.InMainChain(header.Hash()) {
+ startHeader = header
+ break
+ }
+ }
+
+ headers := make([]*types.BlockHeader, 0)
+ stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+ if err != nil {
+ return headers, err
+ }
+
+ if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
+ return headers, nil
+ }
+
+ headers = append(headers, startHeader)
+ if stopHeader.Height == startHeader.Height {
+ return headers, nil
+ }
+
+ for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
+ index += skip + 1
+ if index >= stopHeader.Height {
+ headers = append(headers, stopHeader)
+ break
+ }
+
+ header, err := bk.chain.GetHeaderByHeight(index)
+ if err != nil {
+ return nil, err
+ }
+
+ headers = append(headers, header)
+ }
+
+ return headers, nil
}
func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
func (bk *blockKeeper) regularBlockSync() error {
peerHeight := bk.syncPeer.Height()
bestHeight := bk.chain.BestBlockHeight()
+ targetHeight := bestHeight + maxNumOfBlocksRegularSync
+ if targetHeight > peerHeight {
+ targetHeight = peerHeight
+ }
+
i := bestHeight + 1
- for i <= peerHeight {
+ for i <= targetHeight {
block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
if err != nil {
- bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err)
+ bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
return err
}
isOrphan, err := bk.chain.ProcessBlock(block)
if err != nil {
- bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err)
+ bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
return err
}
}
func (bk *blockKeeper) checkSyncType() int {
- peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
- if peer == nil {
- log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
- return noNeedSync
- }
-
bestHeight := bk.chain.BestBlockHeight()
-
- if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
- bk.fastSync.setSyncPeer(peer)
- return fastSyncType
+ peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
+ if peer != nil {
+ if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
+ bk.fastSync.setSyncPeer(peer)
+ return fastSyncType
+ }
}
peer = bk.peers.BestPeer(consensus.SFFullNode)
return noNeedSync
}
- peerHeight := peer.Height()
- if peerHeight > bestHeight {
+ if peer.Height() > bestHeight {
bk.syncPeer = peer
return regularSyncType
}
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
return false
}
+ default:
+ return false
}
return true