6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/consensus"
9 "github.com/bytom/errors"
10 "github.com/bytom/p2p"
11 "github.com/bytom/protocol"
12 "github.com/bytom/protocol/bc/types"
16 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
17 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
19 syncTimeout = 30 * time.Second
20 requestRetryTicker = 15 * time.Second
22 maxBlocksPending = 1024
26 maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
30 errGetBlockTimeout = errors.New("Get block Timeout")
31 errPeerDropped = errors.New("Peer dropped")
32 errGetBlockByHash = errors.New("Get block by hash error")
33 errBroadcastStatus = errors.New("Broadcast new status block error")
34 errReqBlock = errors.New("Request block error")
35 errPeerNotRegister = errors.New("peer is not registered")
38 //TODO: add retry mechanism
39 type blockKeeper struct {
44 pendingProcessCh chan *blockPending
45 txsProcessCh chan *txsNotify
46 quitReqBlockCh chan *string
49 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
54 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
55 txsProcessCh: make(chan *txsNotify, maxtxsPending),
56 quitReqBlockCh: quitReqBlockCh,
58 go bk.txsProcessWorker()
62 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
63 bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
66 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
67 bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
70 func (bk *blockKeeper) IsCaughtUp() bool {
71 _, height := bk.peers.BestPeer()
72 return bk.chain.BestBlockHeight() < height
75 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
76 num := bk.chain.BestBlockHeight() + 1
77 currentHash := bk.chain.BestBlockHash()
78 orphanNum := uint64(0)
81 bkPeer, ok := bk.peers.Peer(peerID)
83 log.Info("peer is not registered")
84 return errPeerNotRegister
86 swPeer := bkPeer.GetPeer()
87 for 0 < num && num <= maxPeerHeight {
93 block, err := bk.BlockRequest(peerID, reqNum)
94 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
95 log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
97 log.Info("peer is not registered")
100 log.Info("Block keeper request block error. Stop peer.")
101 bk.sw.StopPeerGracefully(swPeer)
104 isOrphan, err = bk.chain.ProcessBlock(block)
107 log.Info("peer is deleted")
110 if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
111 bk.sw.AddBannedPeer(swPeer)
112 bk.sw.StopPeerGracefully(swPeer)
114 log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
118 orphanNum = block.Height - 1
123 bestHash := bk.chain.BestBlockHash()
124 log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
125 if currentHash.String() != bestHash.String() {
126 log.Info("Broadcast new chain status.")
128 block, err := bk.chain.GetBlockByHash(bestHash)
130 log.Errorf("Failed on sync complete broadcast status get block %v", err)
131 return errGetBlockByHash
134 peers, err := bk.peers.BroadcastNewStatus(block)
136 log.Errorf("Failed on broadcast new status block %v", err)
137 return errBroadcastStatus
139 for _, peer := range peers {
141 return errPeerNotRegister
143 swPeer := peer.GetPeer()
144 log.Info("Block keeper broadcast block error. Stop peer.")
145 bk.sw.StopPeerGracefully(swPeer)
151 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
152 return bk.peers.requestBlockByHeight(peerID, height)
155 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
156 height := bk.chain.BestBlockHeader().Height
157 checkpoints := consensus.ActiveNetParams.Checkpoints
158 if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
162 nextCheckpoint := &checkpoints[len(checkpoints)-1]
163 for i := len(checkpoints) - 2; i >= 0; i-- {
164 if height >= checkpoints[i].Height {
167 nextCheckpoint = &checkpoints[i]
169 return nextCheckpoint
172 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
173 var block *types.Block
175 if err := bk.blockRequest(peerID, height); err != nil {
176 return nil, errReqBlock
178 retryTicker := time.Tick(requestRetryTicker)
179 syncWait := time.NewTimer(syncTimeout)
183 case pendingResponse := <-bk.pendingProcessCh:
184 block = pendingResponse.block
185 if pendingResponse.peerID != peerID {
186 log.Warning("From different peer")
189 if block.Height != height {
190 log.Warning("Block height error")
195 if err := bk.blockRequest(peerID, height); err != nil {
196 return nil, errReqBlock
199 log.Warning("Request block timeout")
200 return nil, errGetBlockTimeout
201 case peerid := <-bk.quitReqBlockCh:
202 if *peerid == peerID {
203 log.Info("Quite block request worker")
204 return nil, errPeerDropped
210 func (bk *blockKeeper) txsProcessWorker() {
211 for txsResponse := range bk.txsProcessCh {
213 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
214 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
215 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
216 if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
217 swPeer := bkPeer.GetPeer()
218 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
219 bk.sw.AddBannedPeer(swPeer)
220 bk.sw.StopPeerGracefully(swPeer)