8 log "github.com/sirupsen/logrus"
10 "github.com/bytom/p2p"
11 "github.com/bytom/protocol"
12 "github.com/bytom/protocol/bc"
13 "github.com/bytom/protocol/bc/types"
16 type blockKeeperPeer struct {
22 func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
23 return &blockKeeperPeer{
29 func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
32 return p.height, p.hash
35 func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
43 type pendingResponse struct {
48 //TODO: add retry mechanism
49 type blockKeeper struct {
53 chainUpdateCh <-chan struct{}
54 peerUpdateCh chan struct{}
59 peers map[string]*blockKeeperPeer
60 pendingProcessCh chan *pendingResponse
63 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
64 chainHeight := chain.Height()
66 chainHeight: chainHeight,
67 maxPeerHeight: uint64(0),
68 chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
69 peerUpdateCh: make(chan struct{}, 1000),
70 done: make(chan bool, 1),
74 peers: make(map[string]*blockKeeperPeer),
75 pendingProcessCh: make(chan *pendingResponse),
77 go bk.blockProcessWorker()
78 go bk.blockRequestWorker()
82 func (bk *blockKeeper) Stop() {
86 func (bk *blockKeeper) AddBlock(block *types.Block, src *p2p.Peer) {
87 bk.pendingProcessCh <- &pendingResponse{block: block, src: src}
90 func (bk *blockKeeper) IsCaughtUp() bool {
92 defer bk.mtx.RUnlock()
93 return bk.chainHeight >= bk.maxPeerHeight
96 func (bk *blockKeeper) RemovePeer(peerID string) {
98 delete(bk.peers, peerID)
100 log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
103 func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
104 peer := bk.sw.Peers().Get(peerID)
106 return errors.New("can't find peer in peer pool")
108 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
109 peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
113 func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
114 peer := bk.sw.Peers().Get(peerID)
116 return errors.New("can't find peer in peer pool")
118 msg := &BlockRequestMessage{Height: height}
119 peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
123 func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
125 defer bk.mtx.Unlock()
127 if height > bk.maxPeerHeight {
128 bk.maxPeerHeight = height
129 bk.peerUpdateCh <- struct{}{}
132 if peer, ok := bk.peers[peerID]; ok {
133 peer.SetStatus(height, hash)
136 peer := newBlockKeeperPeer(height, hash)
137 bk.peers[peerID] = peer
138 log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
141 func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
143 defer bk.mtx.RUnlock()
145 for peerID, peer := range bk.peers {
146 if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
147 bk.requestBlockByHeight(peerID, height)
152 func (bk *blockKeeper) blockRequestWorker() {
155 case <-bk.chainUpdateCh:
156 chainHeight := bk.chain.Height()
158 if bk.chainHeight < chainHeight {
159 bk.chainHeight = chainHeight
161 bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
164 case <-bk.peerUpdateCh:
166 chainHeight := bk.chainHeight
167 maxPeerHeight := bk.maxPeerHeight
170 for i := chainHeight + 1; i <= maxPeerHeight; i++ {
171 bk.RequestBlockByHeight(i)
172 waiter := bk.chain.BlockWaiter(i)
173 retryTicker := time.Tick(15 * time.Second)
181 bk.RequestBlockByHeight(i)
192 func (bk *blockKeeper) blockProcessWorker() {
193 for pendingResponse := range bk.pendingProcessCh {
195 block := pendingResponse.block
196 blockHash := block.Hash()
197 isOrphan, err := bk.chain.ProcessBlock(block)
199 bk.sw.AddScamPeer(pendingResponse.src)
200 log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
203 log.WithFields(log.Fields{
204 "height": block.Height,
205 "hash": blockHash.String(),
206 "isOrphan": isOrphan,
207 }).Info("blockKeeper processed block")
210 bk.requestBlockByHash(pendingResponse.src.Key, &block.PreviousBlockHash)