6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/errors"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc/types"
15 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
18 syncTimeout = 30 * time.Second
19 requestRetryTicker = 15 * time.Second
21 maxBlocksPending = 1024
25 // txChanSize is the size of channel listening to Txpool newTxCh.
30 errGetBlockTimeout = errors.New("Get block Timeout")
31 errPeerDropped = errors.New("Peer dropped")
32 errGetBlockByHash = errors.New("Get block by hash error")
33 errBroadcastStatus = errors.New("Broadcast new status block error")
34 errReqBlock = errors.New("Request block error")
35 errPeerNotRegister = errors.New("peer is not registered")
38 //TODO: add retry mechanism
39 type blockKeeper struct {
44 pendingProcessCh chan *blockPending
45 txsProcessCh chan *txsNotify
46 quitReqBlockCh chan *string
49 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
54 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
55 txsProcessCh: make(chan *txsNotify, maxtxsPending),
56 quitReqBlockCh: quitReqBlockCh,
58 go bk.txsProcessWorker()
62 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
63 bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
66 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
67 bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
70 func (bk *blockKeeper) IsCaughtUp() bool {
71 _, height := bk.peers.BestPeer()
72 return bk.chain.BestBlockHeight() < height
75 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
76 num := bk.chain.BestBlockHeight() + 1
77 currentHash := bk.chain.BestBlockHash()
78 orphanNum := uint64(0)
81 bkPeer, ok := bk.peers.Peer(peerID)
83 log.Info("peer is not registered")
84 return errPeerNotRegister
86 swPeer := bkPeer.getPeer()
87 for 0 < num && num <= maxPeerHeight {
93 block, err := bk.BlockRequest(peerID, reqNum)
94 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
95 log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
97 log.Info("peer is not registered")
100 log.Info("Block keeper request block error. Stop peer.")
101 bk.sw.StopPeerGracefully(swPeer)
104 isOrphan, err = bk.chain.ProcessBlock(block)
107 log.Info("peer is deleted")
110 if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
111 bk.sw.AddBannedPeer(swPeer)
112 bk.sw.StopPeerGracefully(swPeer)
114 log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
118 orphanNum = block.Height - 1
123 bestHash := bk.chain.BestBlockHash()
124 log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
125 if currentHash.String() != bestHash.String() {
126 log.Info("Broadcast new chain status.")
128 block, err := bk.chain.GetBlockByHash(bestHash)
130 log.Errorf("Failed on sync complete broadcast status get block %v", err)
131 return errGetBlockByHash
134 peers, err := bk.peers.BroadcastNewStatus(block)
136 log.Errorf("Failed on broadcast new status block %v", err)
137 return errBroadcastStatus
139 for _, peer := range peers {
141 return errPeerNotRegister
143 swPeer := peer.getPeer()
144 log.Info("Block keeper broadcast block error. Stop peer.")
145 bk.sw.StopPeerGracefully(swPeer)
151 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
152 return bk.peers.requestBlockByHeight(peerID, height)
155 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
156 var block *types.Block
158 if err := bk.blockRequest(peerID, height); err != nil {
159 return nil, errReqBlock
161 retryTicker := time.Tick(requestRetryTicker)
162 syncWait := time.NewTimer(syncTimeout)
166 case pendingResponse := <-bk.pendingProcessCh:
167 block = pendingResponse.block
168 if pendingResponse.peerID != peerID {
169 log.Warning("From different peer")
172 if block.Height != height {
173 log.Warning("Block height error")
178 if err := bk.blockRequest(peerID, height); err != nil {
179 return nil, errReqBlock
182 log.Warning("Request block timeout")
183 return nil, errGetBlockTimeout
184 case peerid := <-bk.quitReqBlockCh:
185 if *peerid == peerID {
186 log.Info("Quite block request worker")
187 return nil, errPeerDropped
193 func (bk *blockKeeper) txsProcessWorker() {
194 for txsResponse := range bk.txsProcessCh {
196 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
197 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
198 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
199 if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
200 swPeer := bkPeer.getPeer()
201 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
202 bk.sw.AddBannedPeer(swPeer)
203 bk.sw.StopPeerGracefully(swPeer)