6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/errors"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc/types"
15 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
18 syncTimeout = 30 * time.Second
19 requestRetryTicker = 15 * time.Second
21 maxBlocksPending = 1024
27 errGetBlockTimeout = errors.New("Get block Timeout")
28 errPeerDropped = errors.New("Peer dropped")
29 errGetBlockByHash = errors.New("Get block by hash error")
30 errBroadcastStatus = errors.New("Broadcast new status block error")
31 errReqBlock = errors.New("Request block error")
34 //TODO: add retry mechanism
35 type blockKeeper struct {
40 pendingProcessCh chan *blockPending
41 txsProcessCh chan *txsNotify
42 quitReqBlockCh chan *string
45 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
50 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
51 txsProcessCh: make(chan *txsNotify, maxtxsPending),
52 quitReqBlockCh: quitReqBlockCh,
54 go bk.txsProcessWorker()
58 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
59 bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
62 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
63 bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
66 func (bk *blockKeeper) IsCaughtUp() bool {
67 _, height := bk.peers.BestPeer()
68 return bk.chain.BestBlockHeight() < height
71 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
72 num := bk.chain.BestBlockHeight() + 1
73 currentHash := bk.chain.BestBlockHash()
74 orphanNum := uint64(0)
77 for num <= maxPeerHeight && num > 0 {
83 block, err := bk.BlockRequest(peerID, reqNum)
84 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
85 log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
86 peer := bk.peers.Peer(peerID)
88 log.Info("peer is not registered")
91 log.Info("Peer communication abnormality")
92 bk.sw.StopPeerGracefully(peer.Peer)
95 isOrphan, err = bk.chain.ProcessBlock(block)
97 bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
98 log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
102 orphanNum = block.Height - 1
107 bestHash := bk.chain.BestBlockHash()
108 log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
109 if currentHash.String() != bestHash.String() {
110 log.Info("Broadcast new chain status.")
112 block, err := bk.chain.GetBlockByHash(bestHash)
114 log.Errorf("Failed on sync complete broadcast status get block %v", err)
115 return errGetBlockByHash
118 if err := bk.peers.BroadcastNewStatus(block); err != nil {
119 log.Errorf("Failed on broadcast new status block %v", err)
120 return errBroadcastStatus
126 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
127 return bk.peers.requestBlockByHeight(peerID, height)
130 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
131 var block *types.Block
133 if err := bk.blockRequest(peerID, height); err != nil {
134 return nil, errReqBlock
136 retryTicker := time.Tick(requestRetryTicker)
137 syncWait := time.NewTimer(syncTimeout)
141 case pendingResponse := <-bk.pendingProcessCh:
142 block = pendingResponse.block
143 if pendingResponse.peerID != peerID {
144 log.Warning("From different peer")
147 if block.Height != height {
148 log.Warning("Block height error")
153 if err := bk.blockRequest(peerID, height); err != nil {
154 return nil, errReqBlock
157 log.Warning("Request block timeout")
158 return nil, errGetBlockTimeout
159 case peerid := <-bk.quitReqBlockCh:
160 if *peerid == peerID {
161 log.Info("Quite block request worker")
162 return nil, errPeerDropped
168 func (bk *blockKeeper) txsProcessWorker() {
169 for txsResponse := range bk.txsProcessCh {
171 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
172 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
173 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
174 bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())