OSDN Git Service

Merge pull request #89 from Bytom/dev
[bytom/bytom.git] / blockchain / block_keeper.go
1 package blockchain
2
3 import (
4         "errors"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/p2p"
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/legacy"
13 )
14
15 type blockKeeperPeer struct {
16         mtx    sync.RWMutex
17         height uint64
18         hash   *bc.Hash
19 }
20
21 func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
22         return &blockKeeperPeer{
23                 height: height,
24                 hash:   hash,
25         }
26 }
27
28 func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
29         p.mtx.RLock()
30         defer p.mtx.RUnlock()
31         return p.height, p.hash
32 }
33
34 func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
35         p.mtx.Lock()
36         defer p.mtx.Unlock()
37
38         p.height = height
39         p.hash = hash
40 }
41
42 type pendingResponse struct {
43         block  *legacy.Block
44         peerID string
45 }
46
47 //TODO: add retry mechanism
48 type blockKeeper struct {
49         mtx           sync.RWMutex
50         chainHeight   uint64
51         maxPeerHeight uint64
52         chainUpdateCh <-chan struct{}
53         peerUpdateCh  chan struct{}
54
55         chain            *protocol.Chain
56         sw               *p2p.Switch
57         peers            map[string]*blockKeeperPeer
58         pendingProcessCh chan *pendingResponse
59 }
60
61 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
62         chainHeight := chain.Height()
63         bk := &blockKeeper{
64                 chainHeight:   chainHeight,
65                 maxPeerHeight: uint64(0),
66                 chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
67                 peerUpdateCh:  make(chan struct{}, 1000),
68
69                 chain:            chain,
70                 sw:               sw,
71                 peers:            make(map[string]*blockKeeperPeer),
72                 pendingProcessCh: make(chan *pendingResponse),
73         }
74         go bk.blockProcessWorker()
75         go bk.blockRequestWorker()
76         return bk
77 }
78
79 func (bk *blockKeeper) AddBlock(block *legacy.Block, peerID string) {
80         bk.pendingProcessCh <- &pendingResponse{block: block, peerID: peerID}
81 }
82
83 func (bk *blockKeeper) IsCaughtUp() bool {
84         bk.mtx.RLock()
85         defer bk.mtx.RUnlock()
86         return bk.chainHeight >= bk.maxPeerHeight
87 }
88
89 func (bk *blockKeeper) RemovePeer(peerID string) {
90         bk.mtx.Lock()
91         delete(bk.peers, peerID)
92         bk.mtx.Unlock()
93         log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
94 }
95
96 func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
97         peer := bk.sw.Peers().Get(peerID)
98         if peer == nil {
99                 return errors.New("can't find peer in peer pool")
100         }
101         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
102         peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
103         return nil
104 }
105
106 func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
107         peer := bk.sw.Peers().Get(peerID)
108         if peer == nil {
109                 return errors.New("can't find peer in peer pool")
110         }
111         msg := &BlockRequestMessage{Height: height}
112         peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
113         return nil
114 }
115
116 func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
117         bk.mtx.Lock()
118         defer bk.mtx.Unlock()
119
120         if height > bk.maxPeerHeight {
121                 bk.maxPeerHeight = height
122                 bk.peerUpdateCh <- struct{}{}
123         }
124
125         if peer, ok := bk.peers[peerID]; ok {
126                 peer.SetStatus(height, hash)
127                 return
128         }
129         peer := newBlockKeeperPeer(height, hash)
130         bk.peers[peerID] = peer
131         log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
132 }
133
134 func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
135         bk.mtx.RLock()
136         defer bk.mtx.RUnlock()
137
138         for peerID, peer := range bk.peers {
139                 if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
140                         bk.requestBlockByHeight(peerID, height)
141                 }
142         }
143 }
144
145 func (bk *blockKeeper) blockRequestWorker() {
146         for {
147                 select {
148                 case <-bk.chainUpdateCh:
149                         chainHeight := bk.chain.Height()
150                         bk.mtx.Lock()
151                         if bk.chainHeight < chainHeight {
152                                 bk.chainHeight = chainHeight
153                         }
154                         bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
155                         bk.mtx.Unlock()
156
157                 case <-bk.peerUpdateCh:
158                         bk.mtx.RLock()
159                         chainHeight := bk.chainHeight
160                         maxPeerHeight := bk.maxPeerHeight
161                         bk.mtx.RUnlock()
162
163                         for i := chainHeight + 1; i <= maxPeerHeight; i++ {
164                                 bk.RequestBlockByHeight(i)
165                                 waiter := bk.chain.BlockWaiter(i)
166                                 <-waiter
167                         }
168                 }
169         }
170 }
171
172 func (bk *blockKeeper) blockProcessWorker() {
173         for pendingResponse := range bk.pendingProcessCh {
174                 block := pendingResponse.block
175                 blockHash := block.Hash()
176                 isOrphan, err := bk.chain.ProcessBlock(block)
177                 if err != nil {
178                         log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
179                         continue
180                 }
181                 log.WithFields(log.Fields{
182                         "height":   block.Height,
183                         "hash":     blockHash.String(),
184                         "isOrphan": isOrphan,
185                 }).Info("blockKeeper processed block")
186
187                 if isOrphan {
188                         bk.requestBlockByHash(pendingResponse.peerID, &block.PreviousBlockHash)
189                 }
190         }
191 }