OSDN Git Service

modify import path (#1805)
[bytom/bytom.git] / netsync / block_keeper.go
index d0951e5..05b9aec 100644 (file)
 package netsync
 
 import (
-       "strings"
+       "container/list"
        "time"
 
        log "github.com/sirupsen/logrus"
 
-       "github.com/bytom/errors"
-       "github.com/bytom/p2p"
-       "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc/types"
+       "github.com/bytom/bytom/consensus"
+       "github.com/bytom/bytom/errors"
+       "github.com/bytom/bytom/mining/tensority"
+       "github.com/bytom/bytom/p2p/security"
+       "github.com/bytom/bytom/protocol/bc"
+       "github.com/bytom/bytom/protocol/bc/types"
 )
 
 const (
-       maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
-       maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
-
-       syncTimeout        = 30 * time.Second
-       requestRetryTicker = 15 * time.Second
-
-       maxBlocksPending = 1024
-       maxtxsPending    = 32768
-       maxQuitReq       = 256
+       syncCycle            = 5 * time.Second
+       blockProcessChSize   = 1024
+       blocksProcessChSize  = 128
+       headersProcessChSize = 1024
 )
 
 var (
-       errGetBlockTimeout = errors.New("Get block Timeout")
-       errPeerDropped     = errors.New("Peer dropped")
-       errCommAbnorm      = errors.New("Peer communication abnormality")
-       errScamPeer        = errors.New("Scam peer")
-       errReqBlock        = errors.New("Request block error")
+       maxBlockPerMsg        = uint64(128)
+       maxBlockHeadersPerMsg = uint64(2048)
+       syncTimeout           = 30 * time.Second
+
+       errAppendHeaders  = errors.New("fail to append list due to order dismatch")
+       errRequestTimeout = errors.New("request timeout")
+       errPeerDropped    = errors.New("Peer dropped")
+       errPeerMisbehave  = errors.New("peer is misbehave")
+       ErrPeerMisbehave  = errors.New("peer is misbehave")
 )
 
-//TODO: add retry mechanism
+type blockMsg struct {
+       block  *types.Block
+       peerID string
+}
+
+type blocksMsg struct {
+       blocks []*types.Block
+       peerID string
+}
+
+type headersMsg struct {
+       headers []*types.BlockHeader
+       peerID  string
+}
+
 type blockKeeper struct {
-       chain *protocol.Chain
-       sw    *p2p.Switch
+       chain Chain
        peers *peerSet
 
-       pendingProcessCh chan *blockPending
-       txsProcessCh     chan *txsNotify
-       quitReqBlockCh   chan *string
+       syncPeer         *peer
+       blockProcessCh   chan *blockMsg
+       blocksProcessCh  chan *blocksMsg
+       headersProcessCh chan *headersMsg
+
+       headerList *list.List
 }
 
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
        bk := &blockKeeper{
                chain:            chain,
-               sw:               sw,
                peers:            peers,
-               pendingProcessCh: make(chan *blockPending, maxBlocksPending),
-               txsProcessCh:     make(chan *txsNotify, maxtxsPending),
-               quitReqBlockCh:   quitReqBlockCh,
+               blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
+               blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
+               headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+               headerList:       list.New(),
        }
-       go bk.txsProcessWorker()
+       bk.resetHeaderState()
+       go bk.syncWorker()
        return bk
 }
 
-func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
-       bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
+       for _, header := range headers {
+               prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+               if prevHeader.Hash() != header.PreviousBlockHash {
+                       return errAppendHeaders
+               }
+               bk.headerList.PushBack(header)
+       }
+       return nil
 }
 
-func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
-       bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
-}
+func (bk *blockKeeper) blockLocator() []*bc.Hash {
+       header := bk.chain.BestBlockHeader()
+       locator := []*bc.Hash{}
 
-func (bk *blockKeeper) IsCaughtUp() bool {
-       _, height := bk.peers.BestPeer()
-       return bk.chain.Height() < height
-}
+       step := uint64(1)
+       for header != nil {
+               headerHash := header.Hash()
+               locator = append(locator, &headerHash)
+               if header.Height == 0 {
+                       break
+               }
 
-func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
-       num := bk.chain.Height() + 1
-       orphanNum := uint64(0)
-       reqNum := uint64(0)
-       isOrphan := false
-       for num <= maxPeerHeight && num > 0 {
-               if isOrphan {
-                       reqNum = orphanNum
+               var err error
+               if header.Height < step {
+                       header, err = bk.chain.GetHeaderByHeight(0)
                } else {
-                       reqNum = num
-               }
-               block, err := bk.BlockRequest(peerID, reqNum)
-               if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
-                       log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
-                       peer, ok := bk.peers.peers[peerID]
-                       if !ok {
-                               return errNotRegistered
-                       }
-                       bk.sw.StopPeerGracefully(peer.Peer)
-                       return errCommAbnorm
+                       header, err = bk.chain.GetHeaderByHeight(header.Height - step)
                }
-               isOrphan, err = bk.chain.ProcessBlock(block)
                if err != nil {
-                       bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
-                       log.WithField("hash: ", block.Hash()).Errorf("blockKeeper fail process block %v", err)
-                       return errScamPeer
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
+                       break
                }
-               if isOrphan {
-                       orphanNum = block.Height - 1
-                       continue
+
+               if len(locator) >= 9 {
+                       step *= 2
+               }
+       }
+       return locator
+}
+
+func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
+       bk.resetHeaderState()
+       lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+       for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
+               if lastHeader.Height >= checkPoint.Height {
+                       return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+               }
+
+               lastHash := lastHeader.Hash()
+               headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
+               if err != nil {
+                       return err
+               }
+
+               if len(headers) == 0 {
+                       return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
+               }
+
+               if err := bk.appendHeaderList(headers); err != nil {
+                       return err
+               }
+       }
+
+       fastHeader := bk.headerList.Front()
+       for bk.chain.BestBlockHeight() < checkPoint.Height {
+               locator := bk.blockLocator()
+               blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
+               if err != nil {
+                       return err
+               }
+
+               if len(blocks) == 0 {
+                       return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
+               }
+
+               for _, block := range blocks {
+                       if fastHeader = fastHeader.Next(); fastHeader == nil {
+                               return errors.New("get block than is higher than checkpoint")
+                       }
+
+                       blockHash := block.Hash()
+                       if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
+                               return errPeerMisbehave
+                       }
+
+                       seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
+                       if err != nil {
+                               return errors.Wrap(err, "fail on fastBlockSync calculate seed")
+                       }
+
+                       tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
+                       _, err = bk.chain.ProcessBlock(block)
+                       tensority.AIHash.RemoveCache(&blockHash, seed)
+                       if err != nil {
+                               return errors.Wrap(err, "fail on fastBlockSync process block")
+                       }
                }
-               num++
        }
        return nil
 }
 
-func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
-       return bk.peers.requestBlockByHeight(peerID, height)
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       headers, err := bk.locateHeaders(locator, stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       blocks := []*types.Block{}
+       for i, header := range headers {
+               if uint64(i) >= maxBlockPerMsg {
+                       break
+               }
+
+               headerHash := header.Hash()
+               block, err := bk.chain.GetBlockByHash(&headerHash)
+               if err != nil {
+                       return nil, err
+               }
+
+               blocks = append(blocks, block)
+       }
+       return blocks, nil
+}
+
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       startHeader, err := bk.chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, hash := range locator {
+               header, err := bk.chain.GetHeaderByHash(hash)
+               if err == nil && bk.chain.InMainChain(header.Hash()) {
+                       startHeader = header
+                       break
+               }
+       }
+
+       totalHeaders := stopHeader.Height - startHeader.Height
+       if totalHeaders > maxBlockHeadersPerMsg {
+               totalHeaders = maxBlockHeadersPerMsg
+       }
+
+       headers := []*types.BlockHeader{}
+       for i := uint64(1); i <= totalHeaders; i++ {
+               header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
+               if err != nil {
+                       return nil, err
+               }
+
+               headers = append(headers, header)
+       }
+       return headers, nil
+}
+
+func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
+       height := bk.chain.BestBlockHeader().Height
+       checkpoints := consensus.ActiveNetParams.Checkpoints
+       if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
+               return nil
+       }
+
+       nextCheckpoint := &checkpoints[len(checkpoints)-1]
+       for i := len(checkpoints) - 2; i >= 0; i-- {
+               if height >= checkpoints[i].Height {
+                       break
+               }
+               nextCheckpoint = &checkpoints[i]
+       }
+       return nextCheckpoint
+}
+
+func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
+       bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
 }
 
-func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
-       var block *types.Block
+func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
+       bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
+       bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
+       i := bk.chain.BestBlockHeight() + 1
+       for i <= wantHeight {
+               block, err := bk.requireBlock(i)
+               if err != nil {
+                       return err
+               }
+
+               isOrphan, err := bk.chain.ProcessBlock(block)
+               if err != nil {
+                       return err
+               }
 
-       if err := bk.blockRequest(peerID, height); err != nil {
-               return nil, errReqBlock
+               if isOrphan {
+                       i--
+                       continue
+               }
+               i = bk.chain.BestBlockHeight() + 1
        }
-       retryTicker := time.Tick(requestRetryTicker)
-       syncWait := time.NewTimer(syncTimeout)
+       return nil
+}
+
+func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
+       if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
 
        for {
                select {
-               case pendingResponse := <-bk.pendingProcessCh:
-                       block = pendingResponse.block
-                       if strings.Compare(pendingResponse.peerID, peerID) != 0 {
-                               log.Warning("From different peer")
+               case msg := <-bk.blockProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
                                continue
                        }
-                       if block.Height != height {
-                               log.Warning("Block height error")
+                       if msg.block.Height != height {
                                continue
                        }
-                       return block, nil
-               case <-retryTicker:
-                       if err := bk.blockRequest(peerID, height); err != nil {
-                               return nil, errReqBlock
+                       return msg.block, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+               }
+       }
+}
+
+func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
+
+       for {
+               select {
+               case msg := <-bk.blocksProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
                        }
-               case <-syncWait.C:
-                       log.Warning("Request block timeout")
-                       return nil, errGetBlockTimeout
-               case peerid := <-bk.quitReqBlockCh:
-                       if strings.Compare(*peerid, peerID) == 0 {
-                               log.Info("Quite block request worker")
-                               return nil, errPeerDropped
+                       return msg.blocks, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
+               }
+       }
+}
+
+func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       timeout := time.NewTimer(syncTimeout)
+       defer timeout.Stop()
+
+       for {
+               select {
+               case msg := <-bk.headersProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
                        }
+                       return msg.headers, nil
+               case <-timeout.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+               }
+       }
+}
+
+// resetHeaderState sets the headers-first mode state to values appropriate for
+// syncing from a new peer.
+func (bk *blockKeeper) resetHeaderState() {
+       header := bk.chain.BestBlockHeader()
+       bk.headerList.Init()
+       if bk.nextCheckpoint() != nil {
+               bk.headerList.PushBack(header)
+       }
+}
+
+func (bk *blockKeeper) startSync() bool {
+       checkPoint := bk.nextCheckpoint()
+       peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
+       if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
+               bk.syncPeer = peer
+               if err := bk.fastBlockSync(checkPoint); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
+                       bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
+                       return false
                }
+               return true
        }
+
+       blockHeight := bk.chain.BestBlockHeight()
+       peer = bk.peers.bestPeer(consensus.SFFullNode)
+       if peer != nil && peer.Height() > blockHeight {
+               bk.syncPeer = peer
+               targetHeight := blockHeight + maxBlockPerMsg
+               if targetHeight > peer.Height() {
+                       targetHeight = peer.Height()
+               }
+
+               if err := bk.regularBlockSync(targetHeight); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
+                       bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
+                       return false
+               }
+               return true
+       }
+       return false
 }
 
-func (bk *blockKeeper) txsProcessWorker() {
-       for txsResponse := range bk.txsProcessCh {
-               tx := txsResponse.tx
-               log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
-               bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
-               if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
-                       bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
+func (bk *blockKeeper) syncWorker() {
+       genesisBlock, err := bk.chain.GetBlockByHeight(0)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
+               return
+       }
+       syncTicker := time.NewTicker(syncCycle)
+       defer syncTicker.Stop()
+
+       for {
+               <-syncTicker.C
+               if update := bk.startSync(); !update {
+                       continue
+               }
+
+               block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
+               }
+
+               if err := bk.peers.broadcastMinedBlock(block); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
+               }
+
+               if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
                }
        }
 }