OSDN Git Service

Abstract p2p security module
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/bytom/consensus"
14         "github.com/bytom/errors"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/protocol/bc/types"
17 )
18
19 const (
20         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
21         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
22 )
23
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
26         Addr() net.Addr
27         ID() string
28         ServiceFlag() consensus.ServiceFlag
29         TrafficStatus() (*flowrate.Status, *flowrate.Status)
30         TrySend(byte, interface{}) bool
31         IsLAN() bool
32 }
33
34 //BasePeerSet is the intergace for connection level peer manager
35 type BasePeerSet interface {
36         StopPeerGracefully(string)
37         IsBanned(peerID string, level byte, reason string) bool
38 }
39
40 // PeerInfo indicate peer status snap
41 type PeerInfo struct {
42         ID                  string `json:"peer_id"`
43         RemoteAddr          string `json:"remote_addr"`
44         Height              uint64 `json:"height"`
45         Ping                string `json:"ping"`
46         Duration            string `json:"duration"`
47         TotalSent           int64  `json:"total_sent"`
48         TotalReceived       int64  `json:"total_received"`
49         AverageSentRate     int64  `json:"average_sent_rate"`
50         AverageReceivedRate int64  `json:"average_received_rate"`
51         CurrentSentRate     int64  `json:"current_sent_rate"`
52         CurrentReceivedRate int64  `json:"current_received_rate"`
53 }
54
55 type peer struct {
56         BasePeer
57         mtx         sync.RWMutex
58         services    consensus.ServiceFlag
59         height      uint64
60         hash        *bc.Hash
61         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
62         knownBlocks *set.Set // Set of block hashes known to be known by this peer
63         filterAdds  *set.Set // Set of addresses that the spv node cares about.
64 }
65
66 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
67         return &peer{
68                 BasePeer:    basePeer,
69                 services:    basePeer.ServiceFlag(),
70                 height:      height,
71                 hash:        hash,
72                 knownTxs:    set.New(),
73                 knownBlocks: set.New(),
74                 filterAdds:  set.New(),
75         }
76 }
77
78 func (p *peer) Height() uint64 {
79         p.mtx.RLock()
80         defer p.mtx.RUnlock()
81         return p.height
82 }
83
84 func (p *peer) addFilterAddress(address []byte) {
85         p.mtx.Lock()
86         defer p.mtx.Unlock()
87
88         if p.filterAdds.Size() >= maxFilterAddressCount {
89                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
90                 return
91         }
92         if len(address) > maxFilterAddressSize {
93                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
94                 return
95         }
96
97         p.filterAdds.Add(hex.EncodeToString(address))
98 }
99
100 func (p *peer) addFilterAddresses(addresses [][]byte) {
101         if !p.filterAdds.IsEmpty() {
102                 p.filterAdds.Clear()
103         }
104         for _, address := range addresses {
105                 p.addFilterAddress(address)
106         }
107 }
108
109 func (p *peer) getBlockByHeight(height uint64) bool {
110         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
111         return p.TrySend(BlockchainChannel, msg)
112 }
113
114 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
115         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
116         return p.TrySend(BlockchainChannel, msg)
117 }
118
119 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
120         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
121         return p.TrySend(BlockchainChannel, msg)
122 }
123
124 func (p *peer) getPeerInfo() *PeerInfo {
125         p.mtx.RLock()
126         defer p.mtx.RUnlock()
127
128         sentStatus, receivedStatus := p.TrafficStatus()
129         ping := sentStatus.Idle - receivedStatus.Idle
130         if receivedStatus.Idle > sentStatus.Idle {
131                 ping = -ping
132         }
133
134         return &PeerInfo{
135                 ID:                  p.ID(),
136                 RemoteAddr:          p.Addr().String(),
137                 Height:              p.height,
138                 Ping:                ping.String(),
139                 Duration:            sentStatus.Duration.String(),
140                 TotalSent:           sentStatus.Bytes,
141                 TotalReceived:       receivedStatus.Bytes,
142                 AverageSentRate:     sentStatus.AvgRate,
143                 AverageReceivedRate: receivedStatus.AvgRate,
144                 CurrentSentRate:     sentStatus.CurRate,
145                 CurrentReceivedRate: receivedStatus.CurRate,
146         }
147 }
148
149 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
150         var relatedTxs []*types.Tx
151         var relatedStatuses []*bc.TxVerifyResult
152         for i, tx := range txs {
153                 if p.isRelatedTx(tx) {
154                         relatedTxs = append(relatedTxs, tx)
155                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
156                 }
157         }
158         return relatedTxs, relatedStatuses
159 }
160
161 func (p *peer) isRelatedTx(tx *types.Tx) bool {
162         for _, input := range tx.Inputs {
163                 switch inp := input.TypedInput.(type) {
164                 case *types.SpendInput:
165                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
166                                 return true
167                         }
168                 }
169         }
170         for _, output := range tx.Outputs {
171                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
172                         return true
173                 }
174         }
175         return false
176 }
177
178 func (p *peer) isSPVNode() bool {
179         return !p.services.IsEnable(consensus.SFFullNode)
180 }
181
182 func (p *peer) markBlock(hash *bc.Hash) {
183         p.mtx.Lock()
184         defer p.mtx.Unlock()
185
186         for p.knownBlocks.Size() >= maxKnownBlocks {
187                 p.knownBlocks.Pop()
188         }
189         p.knownBlocks.Add(hash.String())
190 }
191
192 func (p *peer) markTransaction(hash *bc.Hash) {
193         p.mtx.Lock()
194         defer p.mtx.Unlock()
195
196         for p.knownTxs.Size() >= maxKnownTxs {
197                 p.knownTxs.Pop()
198         }
199         p.knownTxs.Add(hash.String())
200 }
201
202 func (p *peer) sendBlock(block *types.Block) (bool, error) {
203         msg, err := NewBlockMessage(block)
204         if err != nil {
205                 return false, errors.Wrap(err, "fail on NewBlockMessage")
206         }
207
208         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
209         if ok {
210                 blcokHash := block.Hash()
211                 p.knownBlocks.Add(blcokHash.String())
212         }
213         return ok, nil
214 }
215
216 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
217         msg, err := NewBlocksMessage(blocks)
218         if err != nil {
219                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
220         }
221
222         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
223                 return ok, nil
224         }
225
226         for _, block := range blocks {
227                 blcokHash := block.Hash()
228                 p.knownBlocks.Add(blcokHash.String())
229         }
230         return true, nil
231 }
232
233 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
234         msg, err := NewHeadersMessage(headers)
235         if err != nil {
236                 return false, errors.New("fail on NewHeadersMessage")
237         }
238
239         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
240         return ok, nil
241 }
242
243 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
244         msg := NewMerkleBlockMessage()
245         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
246                 return false, err
247         }
248
249         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
250
251         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
252         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
253                 return false, nil
254         }
255
256         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
257         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
258                 return false, nil
259         }
260
261         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
262         return ok, nil
263 }
264
265 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
266         for _, tx := range txs {
267                 if p.isSPVNode() && !p.isRelatedTx(tx) {
268                         continue
269                 }
270                 msg, err := NewTransactionMessage(tx)
271                 if err != nil {
272                         return false, errors.Wrap(err, "failed to tx msg")
273                 }
274
275                 if p.knownTxs.Has(tx.ID.String()) {
276                         continue
277                 }
278                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
279                         return ok, nil
280                 }
281                 p.knownTxs.Add(tx.ID.String())
282         }
283         return true, nil
284 }
285
286 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
287         p.mtx.Lock()
288         defer p.mtx.Unlock()
289         p.height = height
290         p.hash = hash
291 }
292
293 type peerSet struct {
294         BasePeerSet
295         mtx   sync.RWMutex
296         peers map[string]*peer
297 }
298
299 // newPeerSet creates a new peer set to track the active participants.
300 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
301         return &peerSet{
302                 BasePeerSet: basePeerSet,
303                 peers:       make(map[string]*peer),
304         }
305 }
306
307 func (ps *peerSet) ProcessIllegal(peerID string, level byte, reason string) {
308         ps.mtx.Lock()
309         peer := ps.peers[peerID]
310         ps.mtx.Unlock()
311
312         if peer == nil {
313                 return
314         }
315         if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
316                 ps.removePeer(peerID)
317         }
318         return
319 }
320
321 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
322         ps.mtx.Lock()
323         defer ps.mtx.Unlock()
324
325         if _, ok := ps.peers[peer.ID()]; !ok {
326                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
327                 return
328         }
329         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
330 }
331
332 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
333         ps.mtx.RLock()
334         defer ps.mtx.RUnlock()
335
336         var bestPeer *peer
337         for _, p := range ps.peers {
338                 if !p.services.IsEnable(flag) {
339                         continue
340                 }
341                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
342                         bestPeer = p
343                 }
344         }
345         return bestPeer
346 }
347
348 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
349         msg, err := NewMinedBlockMessage(block)
350         if err != nil {
351                 return errors.Wrap(err, "fail on broadcast mined block")
352         }
353
354         hash := block.Hash()
355         peers := ps.peersWithoutBlock(&hash)
356         for _, peer := range peers {
357                 if peer.isSPVNode() {
358                         continue
359                 }
360                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
361                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
362                         ps.removePeer(peer.ID())
363                         continue
364                 }
365                 peer.markBlock(&hash)
366         }
367         return nil
368 }
369
370 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
371         bestBlockHash := bestBlock.Hash()
372         peers := ps.peersWithoutBlock(&bestBlockHash)
373
374         genesisHash := genesisBlock.Hash()
375         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
376         for _, peer := range peers {
377                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
378                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
379                         ps.removePeer(peer.ID())
380                         continue
381                 }
382         }
383         return nil
384 }
385
386 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
387         msg, err := NewTransactionMessage(tx)
388         if err != nil {
389                 return errors.Wrap(err, "fail on broadcast tx")
390         }
391
392         peers := ps.peersWithoutTx(&tx.ID)
393         for _, peer := range peers {
394                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
395                         continue
396                 }
397                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
398                         log.WithFields(log.Fields{
399                                 "module":  logModule,
400                                 "peer":    peer.Addr(),
401                                 "type":    reflect.TypeOf(msg),
402                                 "message": msg.String(),
403                         }).Warning("send message to peer error")
404                         ps.removePeer(peer.ID())
405                         continue
406                 }
407                 peer.markTransaction(&tx.ID)
408         }
409         return nil
410 }
411
412 func (ps *peerSet) ErrorHandler(peerID string, level byte, err error) {
413         if errors.Root(err) == ErrPeerMisbehave {
414                 ps.ProcessIllegal(peerID, level, err.Error())
415         } else {
416                 ps.removePeer(peerID)
417         }
418 }
419
420 // Peer retrieves the registered peer with the given id.
421 func (ps *peerSet) getPeer(id string) *peer {
422         ps.mtx.RLock()
423         defer ps.mtx.RUnlock()
424         return ps.peers[id]
425 }
426
427 func (ps *peerSet) getPeerInfos() []*PeerInfo {
428         ps.mtx.RLock()
429         defer ps.mtx.RUnlock()
430
431         result := []*PeerInfo{}
432         for _, peer := range ps.peers {
433                 result = append(result, peer.getPeerInfo())
434         }
435         return result
436 }
437
438 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
439         ps.mtx.RLock()
440         defer ps.mtx.RUnlock()
441
442         peers := []*peer{}
443         for _, peer := range ps.peers {
444                 if !peer.knownBlocks.Has(hash.String()) {
445                         peers = append(peers, peer)
446                 }
447         }
448         return peers
449 }
450
451 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
452         ps.mtx.RLock()
453         defer ps.mtx.RUnlock()
454
455         peers := []*peer{}
456         for _, peer := range ps.peers {
457                 if !peer.knownTxs.Has(hash.String()) {
458                         peers = append(peers, peer)
459                 }
460         }
461         return peers
462 }
463
464 func (ps *peerSet) removePeer(peerID string) {
465         ps.mtx.Lock()
466         delete(ps.peers, peerID)
467         ps.mtx.Unlock()
468         ps.StopPeerGracefully(peerID)
469 }