OSDN Git Service

Fix new mined orphan block broadcast bug (#1592)
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/bytom/consensus"
14         "github.com/bytom/errors"
15         "github.com/bytom/p2p/trust"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
23         defaultBanThreshold = uint64(100)
24 )
25
26 //BasePeer is the interface for connection level peer
27 type BasePeer interface {
28         Addr() net.Addr
29         ID() string
30         ServiceFlag() consensus.ServiceFlag
31         TrafficStatus() (*flowrate.Status, *flowrate.Status)
32         TrySend(byte, interface{}) bool
33 }
34
35 //BasePeerSet is the intergace for connection level peer manager
36 type BasePeerSet interface {
37         AddBannedPeer(string) error
38         StopPeerGracefully(string)
39 }
40
41 // PeerInfo indicate peer status snap
42 type PeerInfo struct {
43         ID                  string `json:"peer_id"`
44         RemoteAddr          string `json:"remote_addr"`
45         Height              uint64 `json:"height"`
46         Ping                string `json:"ping"`
47         Duration            string `json:"duration"`
48         TotalSent           int64  `json:"total_sent"`
49         TotalReceived       int64  `json:"total_received"`
50         AverageSentRate     int64  `json:"average_sent_rate"`
51         AverageReceivedRate int64  `json:"average_received_rate"`
52         CurrentSentRate     int64  `json:"current_sent_rate"`
53         CurrentReceivedRate int64  `json:"current_received_rate"`
54 }
55
56 type peer struct {
57         BasePeer
58         mtx         sync.RWMutex
59         services    consensus.ServiceFlag
60         height      uint64
61         hash        *bc.Hash
62         banScore    trust.DynamicBanScore
63         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
64         knownBlocks *set.Set // Set of block hashes known to be known by this peer
65         filterAdds  *set.Set // Set of addresses that the spv node cares about.
66 }
67
68 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
69         return &peer{
70                 BasePeer:    basePeer,
71                 services:    basePeer.ServiceFlag(),
72                 height:      height,
73                 hash:        hash,
74                 knownTxs:    set.New(),
75                 knownBlocks: set.New(),
76                 filterAdds:  set.New(),
77         }
78 }
79
80 func (p *peer) Height() uint64 {
81         p.mtx.RLock()
82         defer p.mtx.RUnlock()
83         return p.height
84 }
85
86 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
87         score := p.banScore.Increase(persistent, transient)
88         if score > defaultBanThreshold {
89                 log.WithFields(log.Fields{
90                         "module":  logModule,
91                         "address": p.Addr(),
92                         "score":   score,
93                         "reason":  reason,
94                 }).Errorf("banning and disconnecting")
95                 return true
96         }
97
98         warnThreshold := defaultBanThreshold >> 1
99         if score > warnThreshold {
100                 log.WithFields(log.Fields{
101                         "module":  logModule,
102                         "address": p.Addr(),
103                         "score":   score,
104                         "reason":  reason,
105                 }).Warning("ban score increasing")
106         }
107         return false
108 }
109
110 func (p *peer) addFilterAddress(address []byte) {
111         p.mtx.Lock()
112         defer p.mtx.Unlock()
113
114         if p.filterAdds.Size() >= maxFilterAddressCount {
115                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
116                 return
117         }
118         if len(address) > maxFilterAddressSize {
119                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
120                 return
121         }
122
123         p.filterAdds.Add(hex.EncodeToString(address))
124 }
125
126 func (p *peer) addFilterAddresses(addresses [][]byte) {
127         if !p.filterAdds.IsEmpty() {
128                 p.filterAdds.Clear()
129         }
130         for _, address := range addresses {
131                 p.addFilterAddress(address)
132         }
133 }
134
135 func (p *peer) getBlockByHeight(height uint64) bool {
136         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
137         return p.TrySend(BlockchainChannel, msg)
138 }
139
140 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
141         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
142         return p.TrySend(BlockchainChannel, msg)
143 }
144
145 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
146         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
147         return p.TrySend(BlockchainChannel, msg)
148 }
149
150 func (p *peer) getPeerInfo() *PeerInfo {
151         p.mtx.RLock()
152         defer p.mtx.RUnlock()
153
154         sentStatus, receivedStatus := p.TrafficStatus()
155         ping := sentStatus.Idle - receivedStatus.Idle
156         if receivedStatus.Idle > sentStatus.Idle {
157                 ping = -ping
158         }
159
160         return &PeerInfo{
161                 ID:                  p.ID(),
162                 RemoteAddr:          p.Addr().String(),
163                 Height:              p.height,
164                 Ping:                ping.String(),
165                 Duration:            sentStatus.Duration.String(),
166                 TotalSent:           sentStatus.Bytes,
167                 TotalReceived:       receivedStatus.Bytes,
168                 AverageSentRate:     sentStatus.AvgRate,
169                 AverageReceivedRate: receivedStatus.AvgRate,
170                 CurrentSentRate:     sentStatus.CurRate,
171                 CurrentReceivedRate: receivedStatus.CurRate,
172         }
173 }
174
175 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
176         var relatedTxs []*types.Tx
177         var relatedStatuses []*bc.TxVerifyResult
178         for i, tx := range txs {
179                 if p.isRelatedTx(tx) {
180                         relatedTxs = append(relatedTxs, tx)
181                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
182                 }
183         }
184         return relatedTxs, relatedStatuses
185 }
186
187 func (p *peer) isRelatedTx(tx *types.Tx) bool {
188         for _, input := range tx.Inputs {
189                 switch inp := input.TypedInput.(type) {
190                 case *types.SpendInput:
191                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
192                                 return true
193                         }
194                 }
195         }
196         for _, output := range tx.Outputs {
197                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
198                         return true
199                 }
200         }
201         return false
202 }
203
204 func (p *peer) isSPVNode() bool {
205         return !p.services.IsEnable(consensus.SFFullNode)
206 }
207
208 func (p *peer) markBlock(hash *bc.Hash) {
209         p.mtx.Lock()
210         defer p.mtx.Unlock()
211
212         for p.knownBlocks.Size() >= maxKnownBlocks {
213                 p.knownBlocks.Pop()
214         }
215         p.knownBlocks.Add(hash.String())
216 }
217
218 func (p *peer) markTransaction(hash *bc.Hash) {
219         p.mtx.Lock()
220         defer p.mtx.Unlock()
221
222         for p.knownTxs.Size() >= maxKnownTxs {
223                 p.knownTxs.Pop()
224         }
225         p.knownTxs.Add(hash.String())
226 }
227
228 func (p *peer) sendBlock(block *types.Block) (bool, error) {
229         msg, err := NewBlockMessage(block)
230         if err != nil {
231                 return false, errors.Wrap(err, "fail on NewBlockMessage")
232         }
233
234         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
235         if ok {
236                 blcokHash := block.Hash()
237                 p.knownBlocks.Add(blcokHash.String())
238         }
239         return ok, nil
240 }
241
242 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
243         msg, err := NewBlocksMessage(blocks)
244         if err != nil {
245                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
246         }
247
248         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
249                 return ok, nil
250         }
251
252         for _, block := range blocks {
253                 blcokHash := block.Hash()
254                 p.knownBlocks.Add(blcokHash.String())
255         }
256         return true, nil
257 }
258
259 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
260         msg, err := NewHeadersMessage(headers)
261         if err != nil {
262                 return false, errors.New("fail on NewHeadersMessage")
263         }
264
265         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
266         return ok, nil
267 }
268
269 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
270         msg := NewMerkleBlockMessage()
271         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
272                 return false, err
273         }
274
275         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
276
277         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
278         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
279                 return false, nil
280         }
281
282         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
283         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
284                 return false, nil
285         }
286
287         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
288         return ok, nil
289 }
290
291 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
292         for _, tx := range txs {
293                 if p.isSPVNode() && !p.isRelatedTx(tx) {
294                         continue
295                 }
296                 msg, err := NewTransactionMessage(tx)
297                 if err != nil {
298                         return false, errors.Wrap(err, "failed to tx msg")
299                 }
300
301                 if p.knownTxs.Has(tx.ID.String()) {
302                         continue
303                 }
304                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
305                         return ok, nil
306                 }
307                 p.knownTxs.Add(tx.ID.String())
308         }
309         return true, nil
310 }
311
312 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
313         p.mtx.Lock()
314         defer p.mtx.Unlock()
315         p.height = height
316         p.hash = hash
317 }
318
319 type peerSet struct {
320         BasePeerSet
321         mtx   sync.RWMutex
322         peers map[string]*peer
323 }
324
325 // newPeerSet creates a new peer set to track the active participants.
326 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
327         return &peerSet{
328                 BasePeerSet: basePeerSet,
329                 peers:       make(map[string]*peer),
330         }
331 }
332
333 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
334         ps.mtx.Lock()
335         peer := ps.peers[peerID]
336         ps.mtx.Unlock()
337
338         if peer == nil {
339                 return
340         }
341         if ban := peer.addBanScore(persistent, transient, reason); !ban {
342                 return
343         }
344         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
345                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
346         }
347         ps.removePeer(peerID)
348 }
349
350 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
351         ps.mtx.Lock()
352         defer ps.mtx.Unlock()
353
354         if _, ok := ps.peers[peer.ID()]; !ok {
355                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
356                 return
357         }
358         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
359 }
360
361 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
362         ps.mtx.RLock()
363         defer ps.mtx.RUnlock()
364
365         var bestPeer *peer
366         for _, p := range ps.peers {
367                 if !p.services.IsEnable(flag) {
368                         continue
369                 }
370                 if bestPeer == nil || p.height > bestPeer.height {
371                         bestPeer = p
372                 }
373         }
374         return bestPeer
375 }
376
377 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
378         msg, err := NewMinedBlockMessage(block)
379         if err != nil {
380                 return errors.Wrap(err, "fail on broadcast mined block")
381         }
382
383         hash := block.Hash()
384         peers := ps.peersWithoutBlock(&hash)
385         for _, peer := range peers {
386                 if peer.isSPVNode() {
387                         continue
388                 }
389                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
390                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
391                         ps.removePeer(peer.ID())
392                         continue
393                 }
394                 peer.markBlock(&hash)
395         }
396         return nil
397 }
398
399 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
400         bestBlockHash := bestBlock.Hash()
401         peers := ps.peersWithoutBlock(&bestBlockHash)
402
403         genesisHash := genesisBlock.Hash()
404         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
405         for _, peer := range peers {
406                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
407                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
408                         ps.removePeer(peer.ID())
409                         continue
410                 }
411         }
412         return nil
413 }
414
415 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
416         msg, err := NewTransactionMessage(tx)
417         if err != nil {
418                 return errors.Wrap(err, "fail on broadcast tx")
419         }
420
421         peers := ps.peersWithoutTx(&tx.ID)
422         for _, peer := range peers {
423                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
424                         continue
425                 }
426                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
427                         log.WithFields(log.Fields{
428                                 "module":  logModule,
429                                 "peer":    peer.Addr(),
430                                 "type":    reflect.TypeOf(msg),
431                                 "message": msg.String(),
432                         }).Warning("send message to peer error")
433                         ps.removePeer(peer.ID())
434                         continue
435                 }
436                 peer.markTransaction(&tx.ID)
437         }
438         return nil
439 }
440
441 func (ps *peerSet) errorHandler(peerID string, err error) {
442         if errors.Root(err) == errPeerMisbehave {
443                 ps.addBanScore(peerID, 20, 0, err.Error())
444         } else {
445                 ps.removePeer(peerID)
446         }
447 }
448
449 // Peer retrieves the registered peer with the given id.
450 func (ps *peerSet) getPeer(id string) *peer {
451         ps.mtx.RLock()
452         defer ps.mtx.RUnlock()
453         return ps.peers[id]
454 }
455
456 func (ps *peerSet) getPeerInfos() []*PeerInfo {
457         ps.mtx.RLock()
458         defer ps.mtx.RUnlock()
459
460         result := []*PeerInfo{}
461         for _, peer := range ps.peers {
462                 result = append(result, peer.getPeerInfo())
463         }
464         return result
465 }
466
467 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
468         ps.mtx.RLock()
469         defer ps.mtx.RUnlock()
470
471         peers := []*peer{}
472         for _, peer := range ps.peers {
473                 if !peer.knownBlocks.Has(hash.String()) {
474                         peers = append(peers, peer)
475                 }
476         }
477         return peers
478 }
479
480 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
481         ps.mtx.RLock()
482         defer ps.mtx.RUnlock()
483
484         peers := []*peer{}
485         for _, peer := range ps.peers {
486                 if !peer.knownTxs.Has(hash.String()) {
487                         peers = append(peers, peer)
488                 }
489         }
490         return peers
491 }
492
493 func (ps *peerSet) removePeer(peerID string) {
494         ps.mtx.Lock()
495         delete(ps.peers, peerID)
496         ps.mtx.Unlock()
497         ps.StopPeerGracefully(peerID)
498 }