7 log "github.com/sirupsen/logrus"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/legacy"
15 type blockKeeperPeer struct {
21 func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
22 return &blockKeeperPeer{
28 func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
31 return p.height, p.hash
34 func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
42 type pendingResponse struct {
47 //TODO: add retry mechanism
48 type blockKeeper struct {
52 chainUpdateCh <-chan struct{}
53 peerUpdateCh chan struct{}
57 peers map[string]*blockKeeperPeer
58 pendingProcessCh chan *pendingResponse
61 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
62 chainHeight := chain.Height()
64 chainHeight: chainHeight,
65 maxPeerHeight: uint64(0),
66 chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
67 peerUpdateCh: make(chan struct{}, 1000),
71 peers: make(map[string]*blockKeeperPeer),
72 pendingProcessCh: make(chan *pendingResponse),
74 go bk.blockProcessWorker()
75 go bk.blockRequestWorker()
79 func (bk *blockKeeper) AddBlock(block *legacy.Block, peerID string) {
80 bk.pendingProcessCh <- &pendingResponse{block: block, peerID: peerID}
83 func (bk *blockKeeper) IsCaughtUp() bool {
85 defer bk.mtx.RUnlock()
86 return bk.chainHeight >= bk.maxPeerHeight
89 func (bk *blockKeeper) RemovePeer(peerID string) {
91 delete(bk.peers, peerID)
93 log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
96 func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
97 peer := bk.sw.Peers().Get(peerID)
99 return errors.New("can't find peer in peer pool")
101 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
102 peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
106 func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
107 peer := bk.sw.Peers().Get(peerID)
109 return errors.New("can't find peer in peer pool")
111 msg := &BlockRequestMessage{Height: height}
112 peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
116 func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
118 defer bk.mtx.Unlock()
120 if height > bk.maxPeerHeight {
121 bk.maxPeerHeight = height
122 bk.peerUpdateCh <- struct{}{}
125 if peer, ok := bk.peers[peerID]; ok {
126 peer.SetStatus(height, hash)
129 peer := newBlockKeeperPeer(height, hash)
130 bk.peers[peerID] = peer
131 log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
134 func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
136 defer bk.mtx.RUnlock()
138 for peerID, peer := range bk.peers {
139 if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
140 bk.requestBlockByHeight(peerID, height)
145 func (bk *blockKeeper) blockRequestWorker() {
148 case <-bk.chainUpdateCh:
149 chainHeight := bk.chain.Height()
151 if bk.chainHeight < chainHeight {
152 bk.chainHeight = chainHeight
154 bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
157 case <-bk.peerUpdateCh:
159 chainHeight := bk.chainHeight
160 maxPeerHeight := bk.maxPeerHeight
163 for i := chainHeight + 1; i <= maxPeerHeight; i++ {
164 bk.RequestBlockByHeight(i)
165 waiter := bk.chain.BlockWaiter(i)
172 func (bk *blockKeeper) blockProcessWorker() {
173 for pendingResponse := range bk.pendingProcessCh {
174 block := pendingResponse.block
175 blockHash := block.Hash()
176 isOrphan, err := bk.chain.ProcessBlock(block)
178 log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
181 log.WithFields(log.Fields{
182 "height": block.Height,
183 "hash": blockHash.String(),
184 "isOrphan": isOrphan,
185 }).Info("blockKeeper processed block")
188 bk.requestBlockByHash(pendingResponse.peerID, &block.PreviousBlockHash)