package netsync
import (
- "strings"
+ "container/list"
"time"
log "github.com/sirupsen/logrus"
- "github.com/bytom/errors"
- "github.com/bytom/p2p"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc/types"
+ "github.com/bytom/bytom/consensus"
+ "github.com/bytom/bytom/errors"
+ "github.com/bytom/bytom/mining/tensority"
+ "github.com/bytom/bytom/p2p/security"
+ "github.com/bytom/bytom/protocol/bc"
+ "github.com/bytom/bytom/protocol/bc/types"
)
const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
-
- syncTimeout = 30 * time.Second
- requestRetryTicker = 15 * time.Second
-
- maxBlocksPending = 1024
- maxtxsPending = 32768
- maxQuitReq = 256
+ syncCycle = 5 * time.Second
+ blockProcessChSize = 1024
+ blocksProcessChSize = 128
+ headersProcessChSize = 1024
)
var (
- errGetBlockTimeout = errors.New("Get block Timeout")
- errPeerDropped = errors.New("Peer dropped")
- errCommAbnorm = errors.New("Peer communication abnormality")
- errScamPeer = errors.New("Scam peer")
- errReqBlock = errors.New("Request block error")
+ maxBlockPerMsg = uint64(128)
+ maxBlockHeadersPerMsg = uint64(2048)
+ syncTimeout = 30 * time.Second
+
+ errAppendHeaders = errors.New("fail to append list due to order dismatch")
+ errRequestTimeout = errors.New("request timeout")
+ errPeerDropped = errors.New("Peer dropped")
+ errPeerMisbehave = errors.New("peer is misbehave")
+ ErrPeerMisbehave = errors.New("peer is misbehave")
)
-//TODO: add retry mechanism
+type blockMsg struct {
+ block *types.Block
+ peerID string
+}
+
+type blocksMsg struct {
+ blocks []*types.Block
+ peerID string
+}
+
+type headersMsg struct {
+ headers []*types.BlockHeader
+ peerID string
+}
+
type blockKeeper struct {
- chain *protocol.Chain
- sw *p2p.Switch
+ chain Chain
peers *peerSet
- pendingProcessCh chan *blockPending
- txsProcessCh chan *txsNotify
- quitReqBlockCh chan *string
+ syncPeer *peer
+ blockProcessCh chan *blockMsg
+ blocksProcessCh chan *blocksMsg
+ headersProcessCh chan *headersMsg
+
+ headerList *list.List
}
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
bk := &blockKeeper{
chain: chain,
- sw: sw,
peers: peers,
- pendingProcessCh: make(chan *blockPending, maxBlocksPending),
- txsProcessCh: make(chan *txsNotify, maxtxsPending),
- quitReqBlockCh: quitReqBlockCh,
+ blockProcessCh: make(chan *blockMsg, blockProcessChSize),
+ blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
+ headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+ headerList: list.New(),
}
- go bk.txsProcessWorker()
+ bk.resetHeaderState()
+ go bk.syncWorker()
return bk
}
-func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
- bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
+ for _, header := range headers {
+ prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+ if prevHeader.Hash() != header.PreviousBlockHash {
+ return errAppendHeaders
+ }
+ bk.headerList.PushBack(header)
+ }
+ return nil
}
-func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
- bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
-}
+func (bk *blockKeeper) blockLocator() []*bc.Hash {
+ header := bk.chain.BestBlockHeader()
+ locator := []*bc.Hash{}
-func (bk *blockKeeper) IsCaughtUp() bool {
- _, height := bk.peers.BestPeer()
- return bk.chain.Height() < height
-}
+ step := uint64(1)
+ for header != nil {
+ headerHash := header.Hash()
+ locator = append(locator, &headerHash)
+ if header.Height == 0 {
+ break
+ }
-func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
- num := bk.chain.Height() + 1
- orphanNum := uint64(0)
- reqNum := uint64(0)
- isOrphan := false
- for num <= maxPeerHeight && num > 0 {
- if isOrphan {
- reqNum = orphanNum
+ var err error
+ if header.Height < step {
+ header, err = bk.chain.GetHeaderByHeight(0)
} else {
- reqNum = num
- }
- block, err := bk.BlockRequest(peerID, reqNum)
- if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
- log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
- peer, ok := bk.peers.peers[peerID]
- if !ok {
- return errNotRegistered
- }
- bk.sw.StopPeerGracefully(peer.Peer)
- return errCommAbnorm
+ header, err = bk.chain.GetHeaderByHeight(header.Height - step)
}
- isOrphan, err = bk.chain.ProcessBlock(block)
if err != nil {
- bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
- log.WithField("hash: ", block.Hash()).Errorf("blockKeeper fail process block %v", err)
- return errScamPeer
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
+ break
}
- if isOrphan {
- orphanNum = block.Height - 1
- continue
+
+ if len(locator) >= 9 {
+ step *= 2
+ }
+ }
+ return locator
+}
+
+func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
+ bk.resetHeaderState()
+ lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+ for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
+ if lastHeader.Height >= checkPoint.Height {
+ return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+ }
+
+ lastHash := lastHeader.Hash()
+ headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
+ if err != nil {
+ return err
+ }
+
+ if len(headers) == 0 {
+ return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
+ }
+
+ if err := bk.appendHeaderList(headers); err != nil {
+ return err
+ }
+ }
+
+ fastHeader := bk.headerList.Front()
+ for bk.chain.BestBlockHeight() < checkPoint.Height {
+ locator := bk.blockLocator()
+ blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
+ if err != nil {
+ return err
+ }
+
+ if len(blocks) == 0 {
+ return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
+ }
+
+ for _, block := range blocks {
+ if fastHeader = fastHeader.Next(); fastHeader == nil {
+ return errors.New("get block than is higher than checkpoint")
+ }
+
+ blockHash := block.Hash()
+ if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
+ return errPeerMisbehave
+ }
+
+ seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
+ if err != nil {
+ return errors.Wrap(err, "fail on fastBlockSync calculate seed")
+ }
+
+ tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
+ _, err = bk.chain.ProcessBlock(block)
+ tensority.AIHash.RemoveCache(&blockHash, seed)
+ if err != nil {
+ return errors.Wrap(err, "fail on fastBlockSync process block")
+ }
}
- num++
}
return nil
}
-func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
- return bk.peers.requestBlockByHeight(peerID, height)
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ headers, err := bk.locateHeaders(locator, stopHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks := []*types.Block{}
+ for i, header := range headers {
+ if uint64(i) >= maxBlockPerMsg {
+ break
+ }
+
+ headerHash := header.Hash()
+ block, err := bk.chain.GetBlockByHash(&headerHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks = append(blocks, block)
+ }
+ return blocks, nil
+}
+
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+ stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+ if err != nil {
+ return nil, err
+ }
+
+ startHeader, err := bk.chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, hash := range locator {
+ header, err := bk.chain.GetHeaderByHash(hash)
+ if err == nil && bk.chain.InMainChain(header.Hash()) {
+ startHeader = header
+ break
+ }
+ }
+
+ totalHeaders := stopHeader.Height - startHeader.Height
+ if totalHeaders > maxBlockHeadersPerMsg {
+ totalHeaders = maxBlockHeadersPerMsg
+ }
+
+ headers := []*types.BlockHeader{}
+ for i := uint64(1); i <= totalHeaders; i++ {
+ header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
+ if err != nil {
+ return nil, err
+ }
+
+ headers = append(headers, header)
+ }
+ return headers, nil
+}
+
+func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
+ height := bk.chain.BestBlockHeader().Height
+ checkpoints := consensus.ActiveNetParams.Checkpoints
+ if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
+ return nil
+ }
+
+ nextCheckpoint := &checkpoints[len(checkpoints)-1]
+ for i := len(checkpoints) - 2; i >= 0; i-- {
+ if height >= checkpoints[i].Height {
+ break
+ }
+ nextCheckpoint = &checkpoints[i]
+ }
+ return nextCheckpoint
+}
+
+func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
+ bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
}
-func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
- var block *types.Block
+func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
+ bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
+ bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
+ i := bk.chain.BestBlockHeight() + 1
+ for i <= wantHeight {
+ block, err := bk.requireBlock(i)
+ if err != nil {
+ return err
+ }
+
+ isOrphan, err := bk.chain.ProcessBlock(block)
+ if err != nil {
+ return err
+ }
- if err := bk.blockRequest(peerID, height); err != nil {
- return nil, errReqBlock
+ if isOrphan {
+ i--
+ continue
+ }
+ i = bk.chain.BestBlockHeight() + 1
}
- retryTicker := time.Tick(requestRetryTicker)
- syncWait := time.NewTimer(syncTimeout)
+ return nil
+}
+
+func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
+ if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
for {
select {
- case pendingResponse := <-bk.pendingProcessCh:
- block = pendingResponse.block
- if strings.Compare(pendingResponse.peerID, peerID) != 0 {
- log.Warning("From different peer")
+ case msg := <-bk.blockProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
continue
}
- if block.Height != height {
- log.Warning("Block height error")
+ if msg.block.Height != height {
continue
}
- return block, nil
- case <-retryTicker:
- if err := bk.blockRequest(peerID, height); err != nil {
- return nil, errReqBlock
+ return msg.block, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+ }
+ }
+}
+
+func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case msg := <-bk.blocksProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
+ continue
}
- case <-syncWait.C:
- log.Warning("Request block timeout")
- return nil, errGetBlockTimeout
- case peerid := <-bk.quitReqBlockCh:
- if strings.Compare(*peerid, peerID) == 0 {
- log.Info("Quite block request worker")
- return nil, errPeerDropped
+ return msg.blocks, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
+ }
+ }
+}
+
+func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+ if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case msg := <-bk.headersProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
+ continue
}
+ return msg.headers, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+ }
+ }
+}
+
+// resetHeaderState sets the headers-first mode state to values appropriate for
+// syncing from a new peer.
+func (bk *blockKeeper) resetHeaderState() {
+ header := bk.chain.BestBlockHeader()
+ bk.headerList.Init()
+ if bk.nextCheckpoint() != nil {
+ bk.headerList.PushBack(header)
+ }
+}
+
+func (bk *blockKeeper) startSync() bool {
+ checkPoint := bk.nextCheckpoint()
+ peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
+ if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
+ bk.syncPeer = peer
+ if err := bk.fastBlockSync(checkPoint); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
+ bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
+ return false
}
+ return true
}
+
+ blockHeight := bk.chain.BestBlockHeight()
+ peer = bk.peers.bestPeer(consensus.SFFullNode)
+ if peer != nil && peer.Height() > blockHeight {
+ bk.syncPeer = peer
+ targetHeight := blockHeight + maxBlockPerMsg
+ if targetHeight > peer.Height() {
+ targetHeight = peer.Height()
+ }
+
+ if err := bk.regularBlockSync(targetHeight); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
+ bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
+ return false
+ }
+ return true
+ }
+ return false
}
-func (bk *blockKeeper) txsProcessWorker() {
- for txsResponse := range bk.txsProcessCh {
- tx := txsResponse.tx
- log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
- bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
- if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
- bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
+func (bk *blockKeeper) syncWorker() {
+ genesisBlock, err := bk.chain.GetBlockByHeight(0)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
+ return
+ }
+ syncTicker := time.NewTicker(syncCycle)
+ defer syncTicker.Stop()
+
+ for {
+ <-syncTicker.C
+ if update := bk.startSync(); !update {
+ continue
+ }
+
+ block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
+ }
+
+ if err := bk.peers.broadcastMinedBlock(block); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
+ }
+
+ if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
}
}
}