6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/errors"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc/types"
15 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
18 syncTimeout = 30 * time.Second
19 requestRetryTicker = 15 * time.Second
21 maxBlocksPending = 1024
27 errGetBlockTimeout = errors.New("Get block Timeout")
28 errPeerDropped = errors.New("Peer dropped")
29 errGetBlockByHash = errors.New("Get block by hash error")
30 errBroadcastStatus = errors.New("Broadcast new status block error")
31 errReqBlock = errors.New("Request block error")
32 errPeerNotRegister = errors.New("peer is not registered")
35 //TODO: add retry mechanism
36 type blockKeeper struct {
41 pendingProcessCh chan *blockPending
42 txsProcessCh chan *txsNotify
43 quitReqBlockCh chan *string
46 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
51 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
52 txsProcessCh: make(chan *txsNotify, maxtxsPending),
53 quitReqBlockCh: quitReqBlockCh,
55 go bk.txsProcessWorker()
59 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
60 bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
63 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
64 bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
67 func (bk *blockKeeper) IsCaughtUp() bool {
68 _, height := bk.peers.BestPeer()
69 return bk.chain.BestBlockHeight() < height
72 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
73 num := bk.chain.BestBlockHeight() + 1
74 currentHash := bk.chain.BestBlockHash()
75 orphanNum := uint64(0)
78 bkPeer, ok := bk.peers.Peer(peerID)
80 log.Info("peer is not registered")
81 return errPeerNotRegister
83 swPeer := bkPeer.getPeer()
84 for num <= maxPeerHeight && num > 0 {
90 block, err := bk.BlockRequest(peerID, reqNum)
91 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
92 log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
94 log.Info("peer is not registered")
97 log.Info("Block keeper request block error. Stop peer.")
98 bk.sw.StopPeerGracefully(swPeer)
101 isOrphan, err = bk.chain.ProcessBlock(block)
104 log.Info("peer is deleted")
107 if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
108 bk.sw.AddBannedPeer(swPeer)
109 bk.sw.StopPeerGracefully(swPeer)
111 log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
115 orphanNum = block.Height - 1
120 bestHash := bk.chain.BestBlockHash()
121 log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
122 if currentHash.String() != bestHash.String() {
123 log.Info("Broadcast new chain status.")
125 block, err := bk.chain.GetBlockByHash(bestHash)
127 log.Errorf("Failed on sync complete broadcast status get block %v", err)
128 return errGetBlockByHash
131 peers, err := bk.peers.BroadcastNewStatus(block)
133 log.Errorf("Failed on broadcast new status block %v", err)
134 return errBroadcastStatus
136 for _, peer := range peers {
138 return errPeerNotRegister
140 swPeer := peer.getPeer()
141 log.Info("Block keeper broadcast block error. Stop peer.")
142 bk.sw.StopPeerGracefully(swPeer)
148 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
149 return bk.peers.requestBlockByHeight(peerID, height)
152 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
153 var block *types.Block
155 if err := bk.blockRequest(peerID, height); err != nil {
156 return nil, errReqBlock
158 retryTicker := time.Tick(requestRetryTicker)
159 syncWait := time.NewTimer(syncTimeout)
163 case pendingResponse := <-bk.pendingProcessCh:
164 block = pendingResponse.block
165 if pendingResponse.peerID != peerID {
166 log.Warning("From different peer")
169 if block.Height != height {
170 log.Warning("Block height error")
175 if err := bk.blockRequest(peerID, height); err != nil {
176 return nil, errReqBlock
179 log.Warning("Request block timeout")
180 return nil, errGetBlockTimeout
181 case peerid := <-bk.quitReqBlockCh:
182 if *peerid == peerID {
183 log.Info("Quite block request worker")
184 return nil, errPeerDropped
190 func (bk *blockKeeper) txsProcessWorker() {
191 for txsResponse := range bk.txsProcessCh {
193 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
194 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
195 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
196 if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
197 swPeer := bkPeer.getPeer()
198 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
199 bk.sw.AddBannedPeer(swPeer)
200 bk.sw.StopPeerGracefully(swPeer)