OSDN Git Service

Merge pull request #16 from Bytom/dev
[bytom/vapor.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9         "github.com/tendermint/tmlibs/flowrate"
10         "gopkg.in/fatih/set.v0"
11
12         "github.com/vapor/consensus"
13         "github.com/vapor/errors"
14         "github.com/vapor/p2p/trust"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17 )
18
19 const (
20         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
21         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
22         defaultBanThreshold = uint64(100)
23 )
24
25 //BasePeer is the interface for connection level peer
26 type BasePeer interface {
27         Addr() net.Addr
28         ID() string
29         ServiceFlag() consensus.ServiceFlag
30         TrafficStatus() (*flowrate.Status, *flowrate.Status)
31         TrySend(byte, interface{}) bool
32 }
33
34 //BasePeerSet is the intergace for connection level peer manager
35 type BasePeerSet interface {
36         AddBannedPeer(string) error
37         StopPeerGracefully(string)
38 }
39
40 // PeerInfo indicate peer status snap
41 type PeerInfo struct {
42         ID                  string `json:"peer_id"`
43         RemoteAddr          string `json:"remote_addr"`
44         Height              uint64 `json:"height"`
45         Ping                string `json:"ping"`
46         Duration            string `json:"duration"`
47         TotalSent           int64  `json:"total_sent"`
48         TotalReceived       int64  `json:"total_received"`
49         AverageSentRate     int64  `json:"average_sent_rate"`
50         AverageReceivedRate int64  `json:"average_received_rate"`
51         CurrentSentRate     int64  `json:"current_sent_rate"`
52         CurrentReceivedRate int64  `json:"current_received_rate"`
53 }
54
55 type peer struct {
56         BasePeer
57         mtx         sync.RWMutex
58         services    consensus.ServiceFlag
59         height      uint64
60         hash        *bc.Hash
61         banScore    trust.DynamicBanScore
62         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
63         knownBlocks *set.Set // Set of block hashes known to be known by this peer
64         filterAdds  *set.Set // Set of addresses that the spv node cares about.
65 }
66
67 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
68         return &peer{
69                 BasePeer:    basePeer,
70                 services:    basePeer.ServiceFlag(),
71                 height:      height,
72                 hash:        hash,
73                 knownTxs:    set.New(),
74                 knownBlocks: set.New(),
75                 filterAdds:  set.New(),
76         }
77 }
78
79 func (p *peer) Height() uint64 {
80         p.mtx.RLock()
81         defer p.mtx.RUnlock()
82         return p.height
83 }
84
85 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
86         score := p.banScore.Increase(persistent, transient)
87         if score > defaultBanThreshold {
88                 log.WithFields(log.Fields{
89                         "module":  logModule,
90                         "address": p.Addr(),
91                         "score":   score,
92                         "reason":  reason,
93                 }).Errorf("banning and disconnecting")
94                 return true
95         }
96
97         warnThreshold := defaultBanThreshold >> 1
98         if score > warnThreshold {
99                 log.WithFields(log.Fields{
100                         "module":  logModule,
101                         "address": p.Addr(),
102                         "score":   score,
103                         "reason":  reason,
104                 }).Warning("ban score increasing")
105         }
106         return false
107 }
108
109 func (p *peer) addFilterAddress(address []byte) {
110         p.mtx.Lock()
111         defer p.mtx.Unlock()
112
113         if p.filterAdds.Size() >= maxFilterAddressCount {
114                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
115                 return
116         }
117         if len(address) > maxFilterAddressSize {
118                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
119                 return
120         }
121         p.filterAdds.Add(hex.EncodeToString(address))
122 }
123
124 func (p *peer) addFilterAddresses(addresses [][]byte) {
125         if !p.filterAdds.IsEmpty() {
126                 p.filterAdds.Clear()
127         }
128         for _, address := range addresses {
129                 p.addFilterAddress(address)
130         }
131 }
132
133 func (p *peer) getBlockByHeight(height uint64) bool {
134         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
135         return p.TrySend(BlockchainChannel, msg)
136 }
137
138 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
139         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
140         return p.TrySend(BlockchainChannel, msg)
141 }
142
143 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
144         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
145         return p.TrySend(BlockchainChannel, msg)
146 }
147
148 func (p *peer) getPeerInfo() *PeerInfo {
149         p.mtx.RLock()
150         defer p.mtx.RUnlock()
151
152         sentStatus, receivedStatus := p.TrafficStatus()
153         ping := sentStatus.Idle - receivedStatus.Idle
154         if receivedStatus.Idle > sentStatus.Idle {
155                 ping = -ping
156         }
157
158         return &PeerInfo{
159                 ID:                  p.ID(),
160                 RemoteAddr:          p.Addr().String(),
161                 Height:              p.height,
162                 Ping:                ping.String(),
163                 Duration:            sentStatus.Duration.String(),
164                 TotalSent:           sentStatus.Bytes,
165                 TotalReceived:       receivedStatus.Bytes,
166                 AverageSentRate:     sentStatus.AvgRate,
167                 AverageReceivedRate: receivedStatus.AvgRate,
168                 CurrentSentRate:     sentStatus.CurRate,
169                 CurrentReceivedRate: receivedStatus.CurRate,
170         }
171 }
172
173 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
174         var relatedTxs []*types.Tx
175         var relatedStatuses []*bc.TxVerifyResult
176         for i, tx := range txs {
177                 if p.isRelatedTx(tx) {
178                         relatedTxs = append(relatedTxs, tx)
179                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
180                 }
181         }
182         return relatedTxs, relatedStatuses
183 }
184
185 func (p *peer) isRelatedTx(tx *types.Tx) bool {
186         for _, input := range tx.Inputs {
187                 switch inp := input.TypedInput.(type) {
188                 case *types.SpendInput:
189                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
190                                 return true
191                         }
192                 }
193         }
194         for _, output := range tx.Outputs {
195                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
196                         return true
197                 }
198         }
199         return false
200 }
201
202 func (p *peer) isSPVNode() bool {
203         return !p.services.IsEnable(consensus.SFFullNode)
204 }
205
206 func (p *peer) markBlock(hash *bc.Hash) {
207         p.mtx.Lock()
208         defer p.mtx.Unlock()
209
210         for p.knownBlocks.Size() >= maxKnownBlocks {
211                 p.knownBlocks.Pop()
212         }
213         p.knownBlocks.Add(hash.String())
214 }
215
216 func (p *peer) markTransaction(hash *bc.Hash) {
217         p.mtx.Lock()
218         defer p.mtx.Unlock()
219
220         for p.knownTxs.Size() >= maxKnownTxs {
221                 p.knownTxs.Pop()
222         }
223         p.knownTxs.Add(hash.String())
224 }
225
226 func (p *peer) sendBlock(block *types.Block) (bool, error) {
227         msg, err := NewBlockMessage(block)
228         if err != nil {
229                 return false, errors.Wrap(err, "fail on NewBlockMessage")
230         }
231
232         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
233         if ok {
234                 blcokHash := block.Hash()
235                 p.knownBlocks.Add(blcokHash.String())
236         }
237         return ok, nil
238 }
239
240 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
241         msg, err := NewBlocksMessage(blocks)
242         if err != nil {
243                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
244         }
245
246         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
247                 return ok, nil
248         }
249
250         for _, block := range blocks {
251                 blcokHash := block.Hash()
252                 p.knownBlocks.Add(blcokHash.String())
253         }
254         return true, nil
255 }
256
257 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
258         msg, err := NewHeadersMessage(headers)
259         if err != nil {
260                 return false, errors.New("fail on NewHeadersMessage")
261         }
262
263         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
264         return ok, nil
265 }
266
267 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
268         msg := NewMerkleBlockMessage()
269         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
270                 return false, err
271         }
272
273         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
274
275         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
276         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
277                 return false, nil
278         }
279
280         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
281         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
282                 return false, nil
283         }
284
285         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
286         return ok, nil
287 }
288
289 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
290         for _, tx := range txs {
291                 if p.isSPVNode() && !p.isRelatedTx(tx) {
292                         continue
293                 }
294                 msg, err := NewTransactionMessage(tx)
295                 if err != nil {
296                         return false, errors.Wrap(err, "failed to tx msg")
297                 }
298
299                 if p.knownTxs.Has(tx.ID.String()) {
300                         continue
301                 }
302                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
303                         return ok, nil
304                 }
305                 p.knownTxs.Add(tx.ID.String())
306         }
307         return true, nil
308 }
309
310 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
311         p.mtx.Lock()
312         defer p.mtx.Unlock()
313         p.height = height
314         p.hash = hash
315 }
316
317 type peerSet struct {
318         BasePeerSet
319         mtx   sync.RWMutex
320         peers map[string]*peer
321 }
322
323 // newPeerSet creates a new peer set to track the active participants.
324 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
325         return &peerSet{
326                 BasePeerSet: basePeerSet,
327                 peers:       make(map[string]*peer),
328         }
329 }
330
331 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
332         ps.mtx.Lock()
333         peer := ps.peers[peerID]
334         ps.mtx.Unlock()
335
336         if peer == nil {
337                 return
338         }
339         if ban := peer.addBanScore(persistent, transient, reason); !ban {
340                 return
341         }
342         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
343                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
344         }
345         ps.removePeer(peerID)
346 }
347
348 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
349         ps.mtx.Lock()
350         defer ps.mtx.Unlock()
351
352         if _, ok := ps.peers[peer.ID()]; !ok {
353                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
354                 return
355         }
356         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
357 }
358
359 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
360         ps.mtx.RLock()
361         defer ps.mtx.RUnlock()
362
363         var bestPeer *peer
364         for _, p := range ps.peers {
365                 if !p.services.IsEnable(flag) {
366                         continue
367                 }
368                 if bestPeer == nil || p.height > bestPeer.height {
369                         bestPeer = p
370                 }
371         }
372         return bestPeer
373 }
374
375 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
376         msg, err := NewMinedBlockMessage(block)
377         if err != nil {
378                 return errors.Wrap(err, "fail on broadcast mined block")
379         }
380
381         hash := block.Hash()
382         peers := ps.peersWithoutBlock(&hash)
383         for _, peer := range peers {
384                 if peer.isSPVNode() {
385                         continue
386                 }
387                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
388                         ps.removePeer(peer.ID())
389                         continue
390                 }
391                 peer.markBlock(&hash)
392         }
393         return nil
394 }
395
396 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
397         bestBlockHash := bestBlock.Hash()
398         peers := ps.peersWithoutBlock(&bestBlockHash)
399
400         genesisHash := genesisBlock.Hash()
401         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
402         for _, peer := range peers {
403                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
404                         ps.removePeer(peer.ID())
405                         continue
406                 }
407         }
408         return nil
409 }
410
411 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
412         msg, err := NewTransactionMessage(tx)
413         if err != nil {
414                 return errors.Wrap(err, "fail on broadcast tx")
415         }
416
417         peers := ps.peersWithoutTx(&tx.ID)
418         for _, peer := range peers {
419                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
420                         continue
421                 }
422                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
423                         ps.removePeer(peer.ID())
424                         continue
425                 }
426                 peer.markTransaction(&tx.ID)
427         }
428         return nil
429 }
430
431 func (ps *peerSet) errorHandler(peerID string, err error) {
432         if errors.Root(err) == errPeerMisbehave {
433                 ps.addBanScore(peerID, 20, 0, err.Error())
434         } else {
435                 ps.removePeer(peerID)
436         }
437 }
438
439 // Peer retrieves the registered peer with the given id.
440 func (ps *peerSet) getPeer(id string) *peer {
441         ps.mtx.RLock()
442         defer ps.mtx.RUnlock()
443         return ps.peers[id]
444 }
445
446 func (ps *peerSet) getPeerInfos() []*PeerInfo {
447         ps.mtx.RLock()
448         defer ps.mtx.RUnlock()
449
450         result := []*PeerInfo{}
451         for _, peer := range ps.peers {
452                 result = append(result, peer.getPeerInfo())
453         }
454         return result
455 }
456
457 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
458         ps.mtx.RLock()
459         defer ps.mtx.RUnlock()
460
461         peers := []*peer{}
462         for _, peer := range ps.peers {
463                 if !peer.knownBlocks.Has(hash.String()) {
464                         peers = append(peers, peer)
465                 }
466         }
467         return peers
468 }
469
470 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
471         ps.mtx.RLock()
472         defer ps.mtx.RUnlock()
473
474         peers := []*peer{}
475         for _, peer := range ps.peers {
476                 if !peer.knownTxs.Has(hash.String()) {
477                         peers = append(peers, peer)
478                 }
479         }
480         return peers
481 }
482
483 func (ps *peerSet) removePeer(peerID string) {
484         ps.mtx.Lock()
485         delete(ps.peers, peerID)
486         ps.mtx.Unlock()
487         ps.StopPeerGracefully(peerID)
488 }