OSDN Git Service

spv merkle tree proof (#1262)
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9         "gopkg.in/fatih/set.v0"
10
11         "github.com/bytom/consensus"
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 const (
19         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
20         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
21         defaultBanThreshold = uint64(100)
22 )
23
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
26         Addr() net.Addr
27         ID() string
28         ServiceFlag() consensus.ServiceFlag
29         TrySend(byte, interface{}) bool
30 }
31
32 //BasePeerSet is the intergace for connection level peer manager
33 type BasePeerSet interface {
34         AddBannedPeer(string) error
35         StopPeerGracefully(string)
36 }
37
38 // PeerInfo indicate peer status snap
39 type PeerInfo struct {
40         ID         string `json:"peer_id"`
41         RemoteAddr string `json:"remote_addr"`
42         Height     uint64 `json:"height"`
43         Delay      uint32 `json:"delay"`
44 }
45
46 type peer struct {
47         BasePeer
48         mtx         sync.RWMutex
49         services    consensus.ServiceFlag
50         height      uint64
51         hash        *bc.Hash
52         banScore    trust.DynamicBanScore
53         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
54         knownBlocks *set.Set // Set of block hashes known to be known by this peer
55         filterAdds  *set.Set // Set of addresses that the spv node cares about.
56 }
57
58 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
59         return &peer{
60                 BasePeer:    basePeer,
61                 services:    basePeer.ServiceFlag(),
62                 height:      height,
63                 hash:        hash,
64                 knownTxs:    set.New(),
65                 knownBlocks: set.New(),
66                 filterAdds:  set.New(),
67         }
68 }
69
70 func (p *peer) Height() uint64 {
71         p.mtx.RLock()
72         defer p.mtx.RUnlock()
73         return p.height
74 }
75
76 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
77         score := p.banScore.Increase(persistent, transient)
78         if score > defaultBanThreshold {
79                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
80                 return true
81         }
82
83         warnThreshold := defaultBanThreshold >> 1
84         if score > warnThreshold {
85                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
86         }
87         return false
88 }
89
90 func (p *peer) addFilterAddresses(addresses [][]byte) {
91         p.mtx.Lock()
92         defer p.mtx.Unlock()
93
94         if !p.filterAdds.IsEmpty() {
95                 p.filterAdds.Clear()
96         }
97         for _, address := range addresses {
98                 p.filterAdds.Add(hex.EncodeToString(address))
99         }
100 }
101
102 func (p *peer) getBlockByHeight(height uint64) bool {
103         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
104         return p.TrySend(BlockchainChannel, msg)
105 }
106
107 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
108         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
109         return p.TrySend(BlockchainChannel, msg)
110 }
111
112 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
113         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
114         return p.TrySend(BlockchainChannel, msg)
115 }
116
117 func (p *peer) getPeerInfo() *PeerInfo {
118         p.mtx.RLock()
119         defer p.mtx.RUnlock()
120         return &PeerInfo{
121                 ID:         p.ID(),
122                 RemoteAddr: p.Addr().String(),
123                 Height:     p.height,
124         }
125 }
126
127 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
128         var relatedTxs []*types.Tx
129         var relatedStatuses []*bc.TxVerifyResult
130         for i, tx := range txs {
131                 if p.isRelatedTx(tx) {
132                         relatedTxs = append(relatedTxs, tx)
133                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
134                 }
135         }
136         return relatedTxs, relatedStatuses
137 }
138
139 func (p *peer) isRelatedTx(tx *types.Tx) bool {
140         for _, input := range tx.Inputs {
141                 switch inp := input.TypedInput.(type) {
142                 case *types.SpendInput:
143                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
144                                 return true
145                         }
146                 }
147         }
148         for _, output := range tx.Outputs {
149                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
150                         return true
151                 }
152         }
153         return false
154 }
155
156 func (p *peer) isSPVNode() bool {
157         return !p.services.IsEnable(consensus.SFFullNode)
158 }
159
160 func (p *peer) markBlock(hash *bc.Hash) {
161         p.mtx.Lock()
162         defer p.mtx.Unlock()
163
164         for p.knownBlocks.Size() >= maxKnownBlocks {
165                 p.knownBlocks.Pop()
166         }
167         p.knownBlocks.Add(hash.String())
168 }
169
170 func (p *peer) markTransaction(hash *bc.Hash) {
171         p.mtx.Lock()
172         defer p.mtx.Unlock()
173
174         for p.knownTxs.Size() >= maxKnownTxs {
175                 p.knownTxs.Pop()
176         }
177         p.knownTxs.Add(hash.String())
178 }
179
180 func (p *peer) sendBlock(block *types.Block) (bool, error) {
181         msg, err := NewBlockMessage(block)
182         if err != nil {
183                 return false, errors.Wrap(err, "fail on NewBlockMessage")
184         }
185
186         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
187         if ok {
188                 blcokHash := block.Hash()
189                 p.knownBlocks.Add(blcokHash.String())
190         }
191         return ok, nil
192 }
193
194 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
195         msg, err := NewBlocksMessage(blocks)
196         if err != nil {
197                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
198         }
199
200         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
201                 return ok, nil
202         }
203
204         for _, block := range blocks {
205                 blcokHash := block.Hash()
206                 p.knownBlocks.Add(blcokHash.String())
207         }
208         return true, nil
209 }
210
211 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
212         msg, err := NewHeadersMessage(headers)
213         if err != nil {
214                 return false, errors.New("fail on NewHeadersMessage")
215         }
216
217         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
218         return ok, nil
219 }
220
221 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
222         msg := NewMerkleBlockMessage()
223         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
224                 return false, err
225         }
226
227         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
228
229         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
230         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
231                 return false, nil
232         }
233         
234         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
235         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
236                 return false, nil
237         }
238
239         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
240         return ok, nil
241 }
242
243 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
244         for _, tx := range txs {
245                 if p.isSPVNode() && !p.isRelatedTx(tx) {
246                         continue
247                 }
248                 msg, err := NewTransactionMessage(tx)
249                 if err != nil {
250                         return false, errors.Wrap(err, "failed to tx msg")
251                 }
252
253                 if p.knownTxs.Has(tx.ID.String()) {
254                         continue
255                 }
256                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
257                         return ok, nil
258                 }
259                 p.knownTxs.Add(tx.ID.String())
260         }
261         return true, nil
262 }
263
264 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
265         p.mtx.Lock()
266         defer p.mtx.Unlock()
267         p.height = height
268         p.hash = hash
269 }
270
271 type peerSet struct {
272         BasePeerSet
273         mtx   sync.RWMutex
274         peers map[string]*peer
275 }
276
277 // newPeerSet creates a new peer set to track the active participants.
278 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
279         return &peerSet{
280                 BasePeerSet: basePeerSet,
281                 peers:       make(map[string]*peer),
282         }
283 }
284
285 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
286         ps.mtx.Lock()
287         peer := ps.peers[peerID]
288         ps.mtx.Unlock()
289
290         if peer == nil {
291                 return
292         }
293         if ban := peer.addBanScore(persistent, transient, reason); !ban {
294                 return
295         }
296         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
297                 log.WithField("err", err).Error("fail on add ban peer")
298         }
299         ps.removePeer(peerID)
300 }
301
302 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
303         ps.mtx.Lock()
304         defer ps.mtx.Unlock()
305
306         if _, ok := ps.peers[peer.ID()]; !ok {
307                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
308                 return
309         }
310         log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
311 }
312
313 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
314         ps.mtx.RLock()
315         defer ps.mtx.RUnlock()
316
317         var bestPeer *peer
318         for _, p := range ps.peers {
319                 if !p.services.IsEnable(flag) {
320                         continue
321                 }
322                 if bestPeer == nil || p.height > bestPeer.height {
323                         bestPeer = p
324                 }
325         }
326         return bestPeer
327 }
328
329 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
330         msg, err := NewMinedBlockMessage(block)
331         if err != nil {
332                 return errors.Wrap(err, "fail on broadcast mined block")
333         }
334
335         hash := block.Hash()
336         peers := ps.peersWithoutBlock(&hash)
337         for _, peer := range peers {
338                 if peer.isSPVNode() {
339                         continue
340                 }
341                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
342                         ps.removePeer(peer.ID())
343                         continue
344                 }
345                 peer.markBlock(&hash)
346         }
347         return nil
348 }
349
350 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
351         genesisHash := genesisBlock.Hash()
352         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
353         for _, peer := range ps.peers {
354                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
355                         ps.removePeer(peer.ID())
356                         continue
357                 }
358         }
359         return nil
360 }
361
362 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
363         msg, err := NewTransactionMessage(tx)
364         if err != nil {
365                 return errors.Wrap(err, "fail on broadcast tx")
366         }
367
368         peers := ps.peersWithoutTx(&tx.ID)
369         for _, peer := range peers {
370                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
371                         continue
372                 }
373                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
374                         ps.removePeer(peer.ID())
375                         continue
376                 }
377                 peer.markTransaction(&tx.ID)
378         }
379         return nil
380 }
381
382 func (ps *peerSet) errorHandler(peerID string, err error) {
383         if errors.Root(err) == errPeerMisbehave {
384                 ps.addBanScore(peerID, 20, 0, err.Error())
385         } else {
386                 ps.removePeer(peerID)
387         }
388 }
389
390 // Peer retrieves the registered peer with the given id.
391 func (ps *peerSet) getPeer(id string) *peer {
392         ps.mtx.RLock()
393         defer ps.mtx.RUnlock()
394         return ps.peers[id]
395 }
396
397 func (ps *peerSet) getPeerInfos() []*PeerInfo {
398         ps.mtx.RLock()
399         defer ps.mtx.RUnlock()
400
401         result := []*PeerInfo{}
402         for _, peer := range ps.peers {
403                 result = append(result, peer.getPeerInfo())
404         }
405         return result
406 }
407
408 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
409         ps.mtx.RLock()
410         defer ps.mtx.RUnlock()
411
412         peers := []*peer{}
413         for _, peer := range ps.peers {
414                 if !peer.knownBlocks.Has(hash.String()) {
415                         peers = append(peers, peer)
416                 }
417         }
418         return peers
419 }
420
421 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
422         ps.mtx.RLock()
423         defer ps.mtx.RUnlock()
424
425         peers := []*peer{}
426         for _, peer := range ps.peers {
427                 if !peer.knownTxs.Has(hash.String()) {
428                         peers = append(peers, peer)
429                 }
430         }
431         return peers
432 }
433
434 func (ps *peerSet) removePeer(peerID string) {
435         ps.mtx.Lock()
436         delete(ps.peers, peerID)
437         ps.mtx.Unlock()
438         ps.StopPeerGracefully(peerID)
439 }