OSDN Git Service

move several packages out of blockchain (#473)
[bytom/bytom.git] / blockchain / block_keeper.go
1 package blockchain
2
3 import (
4         "errors"
5         "sync"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/bytom/p2p"
11         "github.com/bytom/protocol"
12         "github.com/bytom/protocol/bc"
13         "github.com/bytom/protocol/bc/types"
14 )
15
16 type blockKeeperPeer struct {
17         mtx    sync.RWMutex
18         height uint64
19         hash   *bc.Hash
20 }
21
22 func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
23         return &blockKeeperPeer{
24                 height: height,
25                 hash:   hash,
26         }
27 }
28
29 func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
30         p.mtx.RLock()
31         defer p.mtx.RUnlock()
32         return p.height, p.hash
33 }
34
35 func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
36         p.mtx.Lock()
37         defer p.mtx.Unlock()
38
39         p.height = height
40         p.hash = hash
41 }
42
43 type pendingResponse struct {
44         block *types.Block
45         src   *p2p.Peer
46 }
47
48 //TODO: add retry mechanism
49 type blockKeeper struct {
50         mtx           sync.RWMutex
51         chainHeight   uint64
52         maxPeerHeight uint64
53         chainUpdateCh <-chan struct{}
54         peerUpdateCh  chan struct{}
55         done          chan bool
56
57         chain            *protocol.Chain
58         sw               *p2p.Switch
59         peers            map[string]*blockKeeperPeer
60         pendingProcessCh chan *pendingResponse
61 }
62
63 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
64         chainHeight := chain.Height()
65         bk := &blockKeeper{
66                 chainHeight:   chainHeight,
67                 maxPeerHeight: uint64(0),
68                 chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
69                 peerUpdateCh:  make(chan struct{}, 1000),
70                 done:          make(chan bool, 1),
71
72                 chain:            chain,
73                 sw:               sw,
74                 peers:            make(map[string]*blockKeeperPeer),
75                 pendingProcessCh: make(chan *pendingResponse),
76         }
77         go bk.blockProcessWorker()
78         go bk.blockRequestWorker()
79         return bk
80 }
81
82 func (bk *blockKeeper) Stop() {
83         bk.done <- true
84 }
85
86 func (bk *blockKeeper) AddBlock(block *types.Block, src *p2p.Peer) {
87         bk.pendingProcessCh <- &pendingResponse{block: block, src: src}
88 }
89
90 func (bk *blockKeeper) IsCaughtUp() bool {
91         bk.mtx.RLock()
92         defer bk.mtx.RUnlock()
93         return bk.chainHeight >= bk.maxPeerHeight
94 }
95
96 func (bk *blockKeeper) RemovePeer(peerID string) {
97         bk.mtx.Lock()
98         delete(bk.peers, peerID)
99         bk.mtx.Unlock()
100         log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
101 }
102
103 func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
104         peer := bk.sw.Peers().Get(peerID)
105         if peer == nil {
106                 return errors.New("can't find peer in peer pool")
107         }
108         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
109         peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
110         return nil
111 }
112
113 func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
114         peer := bk.sw.Peers().Get(peerID)
115         if peer == nil {
116                 return errors.New("can't find peer in peer pool")
117         }
118         msg := &BlockRequestMessage{Height: height}
119         peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
120         return nil
121 }
122
123 func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
124         bk.mtx.Lock()
125         defer bk.mtx.Unlock()
126
127         if height > bk.maxPeerHeight {
128                 bk.maxPeerHeight = height
129                 bk.peerUpdateCh <- struct{}{}
130         }
131
132         if peer, ok := bk.peers[peerID]; ok {
133                 peer.SetStatus(height, hash)
134                 return
135         }
136         peer := newBlockKeeperPeer(height, hash)
137         bk.peers[peerID] = peer
138         log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
139 }
140
141 func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
142         bk.mtx.RLock()
143         defer bk.mtx.RUnlock()
144
145         for peerID, peer := range bk.peers {
146                 if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
147                         bk.requestBlockByHeight(peerID, height)
148                 }
149         }
150 }
151
152 func (bk *blockKeeper) blockRequestWorker() {
153         for {
154                 select {
155                 case <-bk.chainUpdateCh:
156                         chainHeight := bk.chain.Height()
157                         bk.mtx.Lock()
158                         if bk.chainHeight < chainHeight {
159                                 bk.chainHeight = chainHeight
160                         }
161                         bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
162                         bk.mtx.Unlock()
163
164                 case <-bk.peerUpdateCh:
165                         bk.mtx.RLock()
166                         chainHeight := bk.chainHeight
167                         maxPeerHeight := bk.maxPeerHeight
168                         bk.mtx.RUnlock()
169
170                         for i := chainHeight + 1; i <= maxPeerHeight; i++ {
171                                 bk.RequestBlockByHeight(i)
172                                 waiter := bk.chain.BlockWaiter(i)
173                                 retryTicker := time.Tick(15 * time.Second)
174
175                         retryLoop:
176                                 for {
177                                         select {
178                                         case <-waiter:
179                                                 break retryLoop
180                                         case <-retryTicker:
181                                                 bk.RequestBlockByHeight(i)
182                                         }
183                                 }
184                         }
185
186                 case <-bk.done:
187                         return
188                 }
189         }
190 }
191
192 func (bk *blockKeeper) blockProcessWorker() {
193         for pendingResponse := range bk.pendingProcessCh {
194
195                 block := pendingResponse.block
196                 blockHash := block.Hash()
197                 isOrphan, err := bk.chain.ProcessBlock(block)
198                 if err != nil {
199                         bk.sw.AddScamPeer(pendingResponse.src)
200                         log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
201                         continue
202                 }
203                 log.WithFields(log.Fields{
204                         "height":   block.Height,
205                         "hash":     blockHash.String(),
206                         "isOrphan": isOrphan,
207                 }).Info("blockKeeper processed block")
208
209                 if isOrphan {
210                         bk.requestBlockByHash(pendingResponse.src.Key, &block.PreviousBlockHash)
211                 }
212         }
213 }