OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
diff --git a/netsync/chainmgr/block_keeper.go b/netsync/chainmgr/block_keeper.go
new file mode 100644 (file)
index 0000000..112fd50
--- /dev/null
@@ -0,0 +1,401 @@
+package chainmgr
+
+import (
+       "container/list"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/consensus"
+       "github.com/vapor/errors"
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p/security"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+const (
+       syncCycle            = 5 * time.Second
+       blockProcessChSize   = 1024
+       blocksProcessChSize  = 128
+       headersProcessChSize = 1024
+)
+
+var (
+       maxBlockPerMsg        = uint64(128)
+       maxBlockHeadersPerMsg = uint64(2048)
+       syncTimeout           = 30 * time.Second
+
+       errAppendHeaders  = errors.New("fail to append list due to order dismatch")
+       errRequestTimeout = errors.New("request timeout")
+       errPeerDropped    = errors.New("Peer dropped")
+)
+
+type blockMsg struct {
+       block  *types.Block
+       peerID string
+}
+
+type blocksMsg struct {
+       blocks []*types.Block
+       peerID string
+}
+
+type headersMsg struct {
+       headers []*types.BlockHeader
+       peerID  string
+}
+
+type blockKeeper struct {
+       chain Chain
+       peers *peers.PeerSet
+
+       syncPeer         *peers.Peer
+       blockProcessCh   chan *blockMsg
+       blocksProcessCh  chan *blocksMsg
+       headersProcessCh chan *headersMsg
+
+       headerList *list.List
+}
+
+func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
+       bk := &blockKeeper{
+               chain:            chain,
+               peers:            peers,
+               blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
+               blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
+               headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+               headerList:       list.New(),
+       }
+       bk.resetHeaderState()
+       go bk.syncWorker()
+       return bk
+}
+
+func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
+       for _, header := range headers {
+               prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+               if prevHeader.Hash() != header.PreviousBlockHash {
+                       return errAppendHeaders
+               }
+               bk.headerList.PushBack(header)
+       }
+       return nil
+}
+
+func (bk *blockKeeper) blockLocator() []*bc.Hash {
+       header := bk.chain.BestBlockHeader()
+       locator := []*bc.Hash{}
+
+       step := uint64(1)
+       for header != nil {
+               headerHash := header.Hash()
+               locator = append(locator, &headerHash)
+               if header.Height == 0 {
+                       break
+               }
+
+               var err error
+               if header.Height < step {
+                       header, err = bk.chain.GetHeaderByHeight(0)
+               } else {
+                       header, err = bk.chain.GetHeaderByHeight(header.Height - step)
+               }
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
+                       break
+               }
+
+               if len(locator) >= 9 {
+                       step *= 2
+               }
+       }
+       return locator
+}
+
+func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
+       bk.resetHeaderState()
+       lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+       for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
+               if lastHeader.Height >= checkPoint.Height {
+                       return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
+               }
+
+               lastHash := lastHeader.Hash()
+               headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
+               if err != nil {
+                       return err
+               }
+
+               if len(headers) == 0 {
+                       return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
+               }
+
+               if err := bk.appendHeaderList(headers); err != nil {
+                       return err
+               }
+       }
+
+       fastHeader := bk.headerList.Front()
+       for bk.chain.BestBlockHeight() < checkPoint.Height {
+               locator := bk.blockLocator()
+               blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
+               if err != nil {
+                       return err
+               }
+
+               if len(blocks) == 0 {
+                       return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
+               }
+
+               for _, block := range blocks {
+                       if fastHeader = fastHeader.Next(); fastHeader == nil {
+                               return errors.New("get block than is higher than checkpoint")
+                       }
+
+                       if _, err = bk.chain.ProcessBlock(block); err != nil {
+                               return errors.Wrap(err, "fail on fastBlockSync process block")
+                       }
+               }
+       }
+       return nil
+}
+
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       headers, err := bk.locateHeaders(locator, stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       blocks := []*types.Block{}
+       for i, header := range headers {
+               if uint64(i) >= maxBlockPerMsg {
+                       break
+               }
+
+               headerHash := header.Hash()
+               block, err := bk.chain.GetBlockByHash(&headerHash)
+               if err != nil {
+                       return nil, err
+               }
+
+               blocks = append(blocks, block)
+       }
+       return blocks, nil
+}
+
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       startHeader, err := bk.chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, hash := range locator {
+               header, err := bk.chain.GetHeaderByHash(hash)
+               if err == nil && bk.chain.InMainChain(header.Hash()) {
+                       startHeader = header
+                       break
+               }
+       }
+
+       totalHeaders := stopHeader.Height - startHeader.Height
+       if totalHeaders > maxBlockHeadersPerMsg {
+               totalHeaders = maxBlockHeadersPerMsg
+       }
+
+       headers := []*types.BlockHeader{}
+       for i := uint64(1); i <= totalHeaders; i++ {
+               header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
+               if err != nil {
+                       return nil, err
+               }
+
+               headers = append(headers, header)
+       }
+       return headers, nil
+}
+
+func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
+       height := bk.chain.BestBlockHeader().Height
+       checkpoints := consensus.ActiveNetParams.Checkpoints
+       if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
+               return nil
+       }
+
+       nextCheckpoint := &checkpoints[len(checkpoints)-1]
+       for i := len(checkpoints) - 2; i >= 0; i-- {
+               if height >= checkpoints[i].Height {
+                       break
+               }
+               nextCheckpoint = &checkpoints[i]
+       }
+       return nextCheckpoint
+}
+
+func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
+       bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
+       bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
+       bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
+       i := bk.chain.BestBlockHeight() + 1
+       for i <= wantHeight {
+               block, err := bk.requireBlock(i)
+               if err != nil {
+                       return err
+               }
+
+               isOrphan, err := bk.chain.ProcessBlock(block)
+               if err != nil {
+                       return err
+               }
+
+               if isOrphan {
+                       i--
+                       continue
+               }
+               i = bk.chain.BestBlockHeight() + 1
+       }
+       return nil
+}
+
+func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
+       if ok := bk.syncPeer.GetBlockByHeight(height); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
+
+       for {
+               select {
+               case msg := <-bk.blockProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
+                       }
+                       if msg.block.Height != height {
+                               continue
+                       }
+                       return msg.block, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+               }
+       }
+}
+
+func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       if ok := bk.syncPeer.GetBlocks(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
+
+       for {
+               select {
+               case msg := <-bk.blocksProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
+                       }
+                       return msg.blocks, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
+               }
+       }
+}
+
+func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       if ok := bk.syncPeer.GetHeaders(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
+
+       for {
+               select {
+               case msg := <-bk.headersProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
+                       }
+                       return msg.headers, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+               }
+       }
+}
+
+// resetHeaderState sets the headers-first mode state to values appropriate for
+// syncing from a new peer.
+func (bk *blockKeeper) resetHeaderState() {
+       header := bk.chain.BestBlockHeader()
+       bk.headerList.Init()
+       if bk.nextCheckpoint() != nil {
+               bk.headerList.PushBack(header)
+       }
+}
+
+func (bk *blockKeeper) startSync() bool {
+       checkPoint := bk.nextCheckpoint()
+       peer := bk.peers.BestPeer(consensus.SFFastSync | consensus.SFFullNode)
+       if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
+               bk.syncPeer = peer
+               if err := bk.fastBlockSync(checkPoint); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
+                       bk.peers.ErrorHandler(peer.ID(), security.LevelMsgIllegal, err)
+                       return false
+               }
+               return true
+       }
+
+       blockHeight := bk.chain.BestBlockHeight()
+       peer = bk.peers.BestPeer(consensus.SFFullNode)
+       if peer != nil && peer.Height() > blockHeight {
+               bk.syncPeer = peer
+               targetHeight := blockHeight + maxBlockPerMsg
+               if targetHeight > peer.Height() {
+                       targetHeight = peer.Height()
+               }
+
+               if err := bk.regularBlockSync(targetHeight); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
+                       bk.peers.ErrorHandler(peer.ID(),security.LevelMsgIllegal, err)
+                       return false
+               }
+               return true
+       }
+       return false
+}
+
+func (bk *blockKeeper) syncWorker() {
+       syncTicker := time.NewTicker(syncCycle)
+       defer syncTicker.Stop()
+
+       for {
+               <-syncTicker.C
+               if update := bk.startSync(); !update {
+                       continue
+               }
+
+               block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
+               }
+
+               if err = bk.peers.BroadcastNewStatus(block); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
+               }
+       }
+}