6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/errors"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc/types"
15 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
18 syncTimeout = 30 * time.Second
19 requestRetryTicker = 15 * time.Second
21 maxBlocksPending = 1024
27 errGetBlockTimeout = errors.New("Get block Timeout")
28 errPeerDropped = errors.New("Peer dropped")
29 errGetBlockByHash = errors.New("Get block by hash error")
30 errBroadcastStatus = errors.New("Broadcast new status block error")
31 errReqBlock = errors.New("Request block error")
32 errPeerNotRegister = errors.New("peer is not registered")
35 //TODO: add retry mechanism
36 type blockKeeper struct {
41 pendingProcessCh chan *blockPending
42 txsProcessCh chan *txsNotify
43 quitReqBlockCh chan *string
46 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
51 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
52 txsProcessCh: make(chan *txsNotify, maxtxsPending),
53 quitReqBlockCh: quitReqBlockCh,
55 go bk.txsProcessWorker()
59 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
60 bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
63 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
64 bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
67 func (bk *blockKeeper) IsCaughtUp() bool {
68 _, height := bk.peers.BestPeer()
69 return bk.chain.BestBlockHeight() < height
72 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
73 num := bk.chain.BestBlockHeight() + 1
74 currentHash := bk.chain.BestBlockHash()
75 orphanNum := uint64(0)
78 bkPeer, ok := bk.peers.Peer(peerID)
80 log.Info("peer is not registered")
81 return errPeerNotRegister
83 swPeer := bkPeer.getPeer()
84 for num <= maxPeerHeight && num > 0 {
90 block, err := bk.BlockRequest(peerID, reqNum)
91 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
92 log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
94 log.Info("peer is not registered")
97 if ban := bkPeer.addBanScore(0, 50, "block request error"); ban {
98 bk.sw.AddBannedPeer(swPeer)
100 bk.sw.StopPeerGracefully(swPeer)
103 isOrphan, err = bk.chain.ProcessBlock(block)
106 log.Info("peer is deleted")
109 if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
110 bk.sw.AddBannedPeer(swPeer)
111 bk.sw.StopPeerGracefully(swPeer)
113 log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
117 orphanNum = block.Height - 1
122 bestHash := bk.chain.BestBlockHash()
123 log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
124 if currentHash.String() != bestHash.String() {
125 log.Info("Broadcast new chain status.")
127 block, err := bk.chain.GetBlockByHash(bestHash)
129 log.Errorf("Failed on sync complete broadcast status get block %v", err)
130 return errGetBlockByHash
133 peers, err := bk.peers.BroadcastNewStatus(block)
135 log.Errorf("Failed on broadcast new status block %v", err)
136 return errBroadcastStatus
138 for _, peer := range peers {
140 return errPeerNotRegister
142 swPeer := peer.getPeer()
143 if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
144 bk.sw.AddBannedPeer(swPeer)
145 bk.sw.StopPeerGracefully(swPeer)
152 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
153 return bk.peers.requestBlockByHeight(peerID, height)
156 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
157 var block *types.Block
159 if err := bk.blockRequest(peerID, height); err != nil {
160 return nil, errReqBlock
162 retryTicker := time.Tick(requestRetryTicker)
163 syncWait := time.NewTimer(syncTimeout)
167 case pendingResponse := <-bk.pendingProcessCh:
168 block = pendingResponse.block
169 if pendingResponse.peerID != peerID {
170 log.Warning("From different peer")
173 if block.Height != height {
174 log.Warning("Block height error")
179 if err := bk.blockRequest(peerID, height); err != nil {
180 return nil, errReqBlock
183 log.Warning("Request block timeout")
184 return nil, errGetBlockTimeout
185 case peerid := <-bk.quitReqBlockCh:
186 if *peerid == peerID {
187 log.Info("Quite block request worker")
188 return nil, errPeerDropped
194 func (bk *blockKeeper) txsProcessWorker() {
195 for txsResponse := range bk.txsProcessCh {
197 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
198 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
199 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
200 if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
201 swPeer := bkPeer.getPeer()
202 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
203 bk.sw.AddBannedPeer(swPeer)
204 bk.sw.StopPeerGracefully(swPeer)