OSDN Git Service

dispatch signature when proccess block (#85)
[bytom/vapor.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         "github.com/vapor/p2p/trust"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
23         defaultBanThreshold = uint32(100)
24 )
25
26 var errSendStatusMsg = errors.New("send status msg fail")
27
28 //BasePeer is the interface for connection level peer
29 type BasePeer interface {
30         Addr() net.Addr
31         ID() string
32         ServiceFlag() consensus.ServiceFlag
33         TrafficStatus() (*flowrate.Status, *flowrate.Status)
34         TrySend(byte, interface{}) bool
35         IsLAN() bool
36 }
37
38 //BasePeerSet is the intergace for connection level peer manager
39 type BasePeerSet interface {
40         AddBannedPeer(string) error
41         StopPeerGracefully(string)
42 }
43
44 // PeerInfo indicate peer status snap
45 type PeerInfo struct {
46         ID                  string `json:"peer_id"`
47         RemoteAddr          string `json:"remote_addr"`
48         Height              uint64 `json:"height"`
49         Ping                string `json:"ping"`
50         Duration            string `json:"duration"`
51         TotalSent           int64  `json:"total_sent"`
52         TotalReceived       int64  `json:"total_received"`
53         AverageSentRate     int64  `json:"average_sent_rate"`
54         AverageReceivedRate int64  `json:"average_received_rate"`
55         CurrentSentRate     int64  `json:"current_sent_rate"`
56         CurrentReceivedRate int64  `json:"current_received_rate"`
57 }
58
59 type peer struct {
60         BasePeer
61         mtx         sync.RWMutex
62         services    consensus.ServiceFlag
63         height      uint64
64         hash        *bc.Hash
65         banScore    trust.DynamicBanScore
66         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
67         knownBlocks *set.Set // Set of block hashes known to be known by this peer
68         knownStatus uint64   // Set of chain status known to be known by this peer
69         filterAdds  *set.Set // Set of addresses that the spv node cares about.
70 }
71
72 func newPeer(basePeer BasePeer) *peer {
73         return &peer{
74                 BasePeer:    basePeer,
75                 services:    basePeer.ServiceFlag(),
76                 knownTxs:    set.New(),
77                 knownBlocks: set.New(),
78                 filterAdds:  set.New(),
79         }
80 }
81
82 func (p *peer) Height() uint64 {
83         p.mtx.RLock()
84         defer p.mtx.RUnlock()
85         return p.height
86 }
87
88 func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
89         score := p.banScore.Increase(persistent, transient)
90         if score > defaultBanThreshold {
91                 log.WithFields(log.Fields{
92                         "module":  logModule,
93                         "address": p.Addr(),
94                         "score":   score,
95                         "reason":  reason,
96                 }).Errorf("banning and disconnecting")
97                 return true
98         }
99
100         warnThreshold := defaultBanThreshold >> 1
101         if score > warnThreshold {
102                 log.WithFields(log.Fields{
103                         "module":  logModule,
104                         "address": p.Addr(),
105                         "score":   score,
106                         "reason":  reason,
107                 }).Warning("ban score increasing")
108         }
109         return false
110 }
111
112 func (p *peer) addFilterAddress(address []byte) {
113         p.mtx.Lock()
114         defer p.mtx.Unlock()
115
116         if p.filterAdds.Size() >= maxFilterAddressCount {
117                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
118                 return
119         }
120         if len(address) > maxFilterAddressSize {
121                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
122                 return
123         }
124
125         p.filterAdds.Add(hex.EncodeToString(address))
126 }
127
128 func (p *peer) addFilterAddresses(addresses [][]byte) {
129         if !p.filterAdds.IsEmpty() {
130                 p.filterAdds.Clear()
131         }
132         for _, address := range addresses {
133                 p.addFilterAddress(address)
134         }
135 }
136
137 func (p *peer) getBlockByHeight(height uint64) bool {
138         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
139         return p.TrySend(BlockchainChannel, msg)
140 }
141
142 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
143         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
144         return p.TrySend(BlockchainChannel, msg)
145 }
146
147 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
148         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
149         return p.TrySend(BlockchainChannel, msg)
150 }
151
152 func (p *peer) getPeerInfo() *PeerInfo {
153         p.mtx.RLock()
154         defer p.mtx.RUnlock()
155
156         sentStatus, receivedStatus := p.TrafficStatus()
157         ping := sentStatus.Idle - receivedStatus.Idle
158         if receivedStatus.Idle > sentStatus.Idle {
159                 ping = -ping
160         }
161
162         return &PeerInfo{
163                 ID:                  p.ID(),
164                 RemoteAddr:          p.Addr().String(),
165                 Height:              p.height,
166                 Ping:                ping.String(),
167                 Duration:            sentStatus.Duration.String(),
168                 TotalSent:           sentStatus.Bytes,
169                 TotalReceived:       receivedStatus.Bytes,
170                 AverageSentRate:     sentStatus.AvgRate,
171                 AverageReceivedRate: receivedStatus.AvgRate,
172                 CurrentSentRate:     sentStatus.CurRate,
173                 CurrentReceivedRate: receivedStatus.CurRate,
174         }
175 }
176
177 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
178         var relatedTxs []*types.Tx
179         var relatedStatuses []*bc.TxVerifyResult
180         for i, tx := range txs {
181                 if p.isRelatedTx(tx) {
182                         relatedTxs = append(relatedTxs, tx)
183                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
184                 }
185         }
186         return relatedTxs, relatedStatuses
187 }
188
189 func (p *peer) isRelatedTx(tx *types.Tx) bool {
190         for _, input := range tx.Inputs {
191                 switch inp := input.TypedInput.(type) {
192                 case *types.SpendInput:
193                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
194                                 return true
195                         }
196                 }
197         }
198         for _, output := range tx.Outputs {
199                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
200                         return true
201                 }
202         }
203         return false
204 }
205
206 func (p *peer) isSPVNode() bool {
207         return !p.services.IsEnable(consensus.SFFullNode)
208 }
209
210 func (p *peer) markBlock(hash *bc.Hash) {
211         p.mtx.Lock()
212         defer p.mtx.Unlock()
213
214         for p.knownBlocks.Size() >= maxKnownBlocks {
215                 p.knownBlocks.Pop()
216         }
217         p.knownBlocks.Add(hash.String())
218 }
219
220 func (p *peer) markNewStatus(height uint64) {
221         p.mtx.Lock()
222         defer p.mtx.Unlock()
223
224         p.knownStatus = height
225 }
226
227 func (p *peer) markTransaction(hash *bc.Hash) {
228         p.mtx.Lock()
229         defer p.mtx.Unlock()
230
231         for p.knownTxs.Size() >= maxKnownTxs {
232                 p.knownTxs.Pop()
233         }
234         p.knownTxs.Add(hash.String())
235 }
236
237 func (p *peer) sendBlock(block *types.Block) (bool, error) {
238         msg, err := NewBlockMessage(block)
239         if err != nil {
240                 return false, errors.Wrap(err, "fail on NewBlockMessage")
241         }
242
243         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
244         if ok {
245                 blcokHash := block.Hash()
246                 p.knownBlocks.Add(blcokHash.String())
247         }
248         return ok, nil
249 }
250
251 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
252         msg, err := NewBlocksMessage(blocks)
253         if err != nil {
254                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
255         }
256
257         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
258                 return ok, nil
259         }
260
261         for _, block := range blocks {
262                 blcokHash := block.Hash()
263                 p.knownBlocks.Add(blcokHash.String())
264         }
265         return true, nil
266 }
267
268 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
269         msg, err := NewHeadersMessage(headers)
270         if err != nil {
271                 return false, errors.New("fail on NewHeadersMessage")
272         }
273
274         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
275         return ok, nil
276 }
277
278 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
279         msg := NewMerkleBlockMessage()
280         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
281                 return false, err
282         }
283
284         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
285
286         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
287         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
288                 return false, nil
289         }
290
291         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
292         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
293                 return false, nil
294         }
295
296         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
297         return ok, nil
298 }
299
300 func (p *peer) sendTransactions(txs []*types.Tx) error {
301         validTxs := make([]*types.Tx, 0, len(txs))
302         for i, tx := range txs {
303                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
304                         continue
305                 }
306
307                 validTxs = append(validTxs, tx)
308                 if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
309                         continue
310                 }
311
312                 msg, err := NewTransactionsMessage(validTxs)
313                 if err != nil {
314                         return err
315                 }
316
317                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
318                         return errors.New("failed to send txs msg")
319                 }
320
321                 for _, validTx := range validTxs {
322                         p.knownTxs.Add(validTx.ID.String())
323                 }
324
325                 validTxs = make([]*types.Tx, 0, len(txs))
326         }
327
328         return nil
329 }
330
331 func (p *peer) sendStatus(header *types.BlockHeader) error {
332         msg := NewStatusMessage(header)
333         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
334                 return errSendStatusMsg
335         }
336         p.markNewStatus(header.Height)
337         return nil
338 }
339
340 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
341         p.mtx.Lock()
342         defer p.mtx.Unlock()
343         p.height = height
344         p.hash = hash
345 }
346
347 type peerSet struct {
348         BasePeerSet
349         mtx   sync.RWMutex
350         peers map[string]*peer
351 }
352
353 // newPeerSet creates a new peer set to track the active participants.
354 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
355         return &peerSet{
356                 BasePeerSet: basePeerSet,
357                 peers:       make(map[string]*peer),
358         }
359 }
360
361 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
362         ps.mtx.Lock()
363         peer := ps.peers[peerID]
364         ps.mtx.Unlock()
365
366         if peer == nil {
367                 return
368         }
369         if ban := peer.addBanScore(persistent, transient, reason); !ban {
370                 return
371         }
372         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
373                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
374         }
375         ps.removePeer(peerID)
376 }
377
378 func (ps *peerSet) addPeer(peer BasePeer) {
379         ps.mtx.Lock()
380         defer ps.mtx.Unlock()
381
382         if _, ok := ps.peers[peer.ID()]; !ok {
383                 ps.peers[peer.ID()] = newPeer(peer)
384                 return
385         }
386         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
387 }
388
389 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
390         ps.mtx.RLock()
391         defer ps.mtx.RUnlock()
392
393         var bestPeer *peer
394         for _, p := range ps.peers {
395                 if !p.services.IsEnable(flag) {
396                         continue
397                 }
398                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
399                         bestPeer = p
400                 }
401         }
402         return bestPeer
403 }
404
405 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
406         msg, err := NewMinedBlockMessage(block)
407         if err != nil {
408                 return errors.Wrap(err, "fail on broadcast mined block")
409         }
410
411         hash := block.Hash()
412         peers := ps.peersWithoutBlock(&hash)
413         for _, peer := range peers {
414                 if peer.isSPVNode() {
415                         continue
416                 }
417                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
418                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
419                         ps.removePeer(peer.ID())
420                         continue
421                 }
422                 peer.markBlock(&hash)
423                 peer.markNewStatus(block.Height)
424         }
425         return nil
426 }
427
428 func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
429         msg := NewStatusMessage(&bestBlock.BlockHeader)
430         peers := ps.peersWithoutNewStatus(bestBlock.Height)
431         for _, peer := range peers {
432                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
433                         ps.removePeer(peer.ID())
434                         continue
435                 }
436
437                 peer.markNewStatus(bestBlock.Height)
438         }
439         return nil
440 }
441
442 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
443         msg, err := NewTransactionMessage(tx)
444         if err != nil {
445                 return errors.Wrap(err, "fail on broadcast tx")
446         }
447
448         peers := ps.peersWithoutTx(&tx.ID)
449         for _, peer := range peers {
450                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
451                         continue
452                 }
453                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
454                         log.WithFields(log.Fields{
455                                 "module":  logModule,
456                                 "peer":    peer.Addr(),
457                                 "type":    reflect.TypeOf(msg),
458                                 "message": msg.String(),
459                         }).Warning("send message to peer error")
460                         ps.removePeer(peer.ID())
461                         continue
462                 }
463                 peer.markTransaction(&tx.ID)
464         }
465         return nil
466 }
467
468 func (ps *peerSet) errorHandler(peerID string, err error) {
469         if errors.Root(err) == errPeerMisbehave {
470                 ps.addBanScore(peerID, 20, 0, err.Error())
471         } else {
472                 ps.removePeer(peerID)
473         }
474 }
475
476 // Peer retrieves the registered peer with the given id.
477 func (ps *peerSet) getPeer(id string) *peer {
478         ps.mtx.RLock()
479         defer ps.mtx.RUnlock()
480         return ps.peers[id]
481 }
482
483 func (ps *peerSet) getPeerInfos() []*PeerInfo {
484         ps.mtx.RLock()
485         defer ps.mtx.RUnlock()
486
487         result := []*PeerInfo{}
488         for _, peer := range ps.peers {
489                 result = append(result, peer.getPeerInfo())
490         }
491         return result
492 }
493
494 func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
495         ps.mtx.Lock()
496         peer := ps.peers[peerID]
497         ps.mtx.Unlock()
498
499         if peer == nil {
500                 return
501         }
502         peer.markTransaction(&txHash)
503 }
504
505 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
506         ps.mtx.RLock()
507         defer ps.mtx.RUnlock()
508
509         peers := []*peer{}
510         for _, peer := range ps.peers {
511                 if !peer.knownBlocks.Has(hash.String()) {
512                         peers = append(peers, peer)
513                 }
514         }
515         return peers
516 }
517
518 func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
519         ps.mtx.RLock()
520         defer ps.mtx.RUnlock()
521
522         var peers []*peer
523         for _, peer := range ps.peers {
524                 if peer.knownStatus < height {
525                         peers = append(peers, peer)
526                 }
527         }
528         return peers
529 }
530
531 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
532         ps.mtx.RLock()
533         defer ps.mtx.RUnlock()
534
535         peers := []*peer{}
536         for _, peer := range ps.peers {
537                 if !peer.knownTxs.Has(hash.String()) {
538                         peers = append(peers, peer)
539                 }
540         }
541         return peers
542 }
543
544 func (ps *peerSet) removePeer(peerID string) {
545         ps.mtx.Lock()
546         delete(ps.peers, peerID)
547         ps.mtx.Unlock()
548         ps.StopPeerGracefully(peerID)
549 }