OSDN Git Service

netsync add test case (#365)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
index 24b4ef3..80c1277 100644 (file)
@@ -6,7 +6,7 @@ import (
        log "github.com/sirupsen/logrus"
 
        "github.com/vapor/consensus"
-       "github.com/vapor/errors"
+       dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p/security"
        "github.com/vapor/protocol/bc"
@@ -22,15 +22,12 @@ const (
 )
 
 var (
-       syncTimeout = 30 * time.Second
-
-       errRequestTimeout = errors.New("request timeout")
-       errPeerDropped    = errors.New("Peer dropped")
+       maxNumOfBlocksPerMsg      = uint64(1000)
+       maxNumOfHeadersPerMsg     = uint64(1000)
+       maxNumOfBlocksRegularSync = uint64(128)
 )
 
 type FastSync interface {
-       locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
-       locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error)
        process() error
        setSyncPeer(peer *peers.Peer)
 }
@@ -67,11 +64,12 @@ type blockKeeper struct {
        quit chan struct{}
 }
 
-func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
-       msgFetcher := newMsgFetcher(peers)
+func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
+       storage := newStorage(fastSyncDB)
+       msgFetcher := newMsgFetcher(storage, peers)
        return &blockKeeper{
                chain:      chain,
-               fastSync:   newFastSync(chain, msgFetcher, peers),
+               fastSync:   newFastSync(chain, msgFetcher, storage, peers),
                msgFetcher: msgFetcher,
                peers:      peers,
                quit:       make(chan struct{}),
@@ -79,11 +77,69 @@ func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
 }
 
 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
-       return bk.fastSync.locateBlocks(locator, stopHash)
+       headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
+       if err != nil {
+               return nil, err
+       }
+
+       blocks := []*types.Block{}
+       for _, header := range headers {
+               headerHash := header.Hash()
+               block, err := bk.chain.GetBlockByHash(&headerHash)
+               if err != nil {
+                       return nil, err
+               }
+
+               blocks = append(blocks, block)
+       }
+       return blocks, nil
 }
 
 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
-       return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum)
+       startHeader, err := bk.chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, hash := range locator {
+               header, err := bk.chain.GetHeaderByHash(hash)
+               if err == nil && bk.chain.InMainChain(header.Hash()) {
+                       startHeader = header
+                       break
+               }
+       }
+
+       headers := make([]*types.BlockHeader, 0)
+       stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+       if err != nil {
+               return headers, err
+       }
+
+       if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
+               return headers, nil
+       }
+
+       headers = append(headers, startHeader)
+       if stopHeader.Height == startHeader.Height {
+               return headers, nil
+       }
+
+       for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
+               index += skip + 1
+               if index >= stopHeader.Height {
+                       headers = append(headers, stopHeader)
+                       break
+               }
+
+               header, err := bk.chain.GetHeaderByHeight(index)
+               if err != nil {
+                       return nil, err
+               }
+
+               headers = append(headers, header)
+       }
+
+       return headers, nil
 }
 
 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
@@ -101,17 +157,22 @@ func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeade
 func (bk *blockKeeper) regularBlockSync() error {
        peerHeight := bk.syncPeer.Height()
        bestHeight := bk.chain.BestBlockHeight()
+       targetHeight := bestHeight + maxNumOfBlocksRegularSync
+       if targetHeight > peerHeight {
+               targetHeight = peerHeight
+       }
+
        i := bestHeight + 1
-       for i <= peerHeight {
+       for i <= targetHeight {
                block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
                if err != nil {
-                       bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err)
+                       bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
                        return err
                }
 
                isOrphan, err := bk.chain.ProcessBlock(block)
                if err != nil {
-                       bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err)
+                       bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
                        return err
                }
 
@@ -130,17 +191,13 @@ func (bk *blockKeeper) start() {
 }
 
 func (bk *blockKeeper) checkSyncType() int {
-       peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
-       if peer == nil {
-               log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
-               return noNeedSync
-       }
-
        bestHeight := bk.chain.BestBlockHeight()
-
-       if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
-               bk.fastSync.setSyncPeer(peer)
-               return fastSyncType
+       peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
+       if peer != nil {
+               if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
+                       bk.fastSync.setSyncPeer(peer)
+                       return fastSyncType
+               }
        }
 
        peer = bk.peers.BestPeer(consensus.SFFullNode)
@@ -149,8 +206,7 @@ func (bk *blockKeeper) checkSyncType() int {
                return noNeedSync
        }
 
-       peerHeight := peer.Height()
-       if peerHeight > bestHeight {
+       if peer.Height() > bestHeight {
                bk.syncPeer = peer
                return regularSyncType
        }
@@ -170,6 +226,8 @@ func (bk *blockKeeper) startSync() bool {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
                        return false
                }
+       default:
+               return false
        }
 
        return true