OSDN Git Service

docs(release note): update bytom version 1.1.0 release note
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/bytom/bytom/consensus"
14         "github.com/bytom/bytom/errors"
15         "github.com/bytom/bytom/protocol/bc"
16         "github.com/bytom/bytom/protocol/bc/types"
17 )
18
19 const (
20         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
21         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
22 )
23
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
26         Addr() net.Addr
27         ID() string
28         RemoteAddrHost() string
29         ServiceFlag() consensus.ServiceFlag
30         TrafficStatus() (*flowrate.Status, *flowrate.Status)
31         TrySend(byte, interface{}) bool
32         IsLAN() bool
33 }
34
35 //BasePeerSet is the intergace for connection level peer manager
36 type BasePeerSet interface {
37         StopPeerGracefully(string)
38         IsBanned(ip string, level byte, reason string) bool
39 }
40
41 // PeerInfo indicate peer status snap
42 type PeerInfo struct {
43         ID                  string `json:"peer_id"`
44         RemoteAddr          string `json:"remote_addr"`
45         Height              uint64 `json:"height"`
46         Ping                string `json:"ping"`
47         Duration            string `json:"duration"`
48         TotalSent           int64  `json:"total_sent"`
49         TotalReceived       int64  `json:"total_received"`
50         AverageSentRate     int64  `json:"average_sent_rate"`
51         AverageReceivedRate int64  `json:"average_received_rate"`
52         CurrentSentRate     int64  `json:"current_sent_rate"`
53         CurrentReceivedRate int64  `json:"current_received_rate"`
54 }
55
56 type peer struct {
57         BasePeer
58         mtx         sync.RWMutex
59         services    consensus.ServiceFlag
60         height      uint64
61         hash        *bc.Hash
62         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
63         knownBlocks *set.Set // Set of block hashes known to be known by this peer
64         filterAdds  *set.Set // Set of addresses that the spv node cares about.
65 }
66
67 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
68         return &peer{
69                 BasePeer:    basePeer,
70                 services:    basePeer.ServiceFlag(),
71                 height:      height,
72                 hash:        hash,
73                 knownTxs:    set.New(),
74                 knownBlocks: set.New(),
75                 filterAdds:  set.New(),
76         }
77 }
78
79 func (p *peer) Height() uint64 {
80         p.mtx.RLock()
81         defer p.mtx.RUnlock()
82         return p.height
83 }
84
85 func (p *peer) addFilterAddress(address []byte) {
86         p.mtx.Lock()
87         defer p.mtx.Unlock()
88
89         if p.filterAdds.Size() >= maxFilterAddressCount {
90                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
91                 return
92         }
93         if len(address) > maxFilterAddressSize {
94                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
95                 return
96         }
97
98         p.filterAdds.Add(hex.EncodeToString(address))
99 }
100
101 func (p *peer) addFilterAddresses(addresses [][]byte) {
102         if !p.filterAdds.IsEmpty() {
103                 p.filterAdds.Clear()
104         }
105         for _, address := range addresses {
106                 p.addFilterAddress(address)
107         }
108 }
109
110 func (p *peer) getBlockByHeight(height uint64) bool {
111         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
112         return p.TrySend(BlockchainChannel, msg)
113 }
114
115 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
116         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
117         return p.TrySend(BlockchainChannel, msg)
118 }
119
120 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
121         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
122         return p.TrySend(BlockchainChannel, msg)
123 }
124
125 func (p *peer) getPeerInfo() *PeerInfo {
126         p.mtx.RLock()
127         defer p.mtx.RUnlock()
128
129         sentStatus, receivedStatus := p.TrafficStatus()
130         ping := sentStatus.Idle - receivedStatus.Idle
131         if receivedStatus.Idle > sentStatus.Idle {
132                 ping = -ping
133         }
134
135         return &PeerInfo{
136                 ID:                  p.ID(),
137                 RemoteAddr:          p.Addr().String(),
138                 Height:              p.height,
139                 Ping:                ping.String(),
140                 Duration:            sentStatus.Duration.String(),
141                 TotalSent:           sentStatus.Bytes,
142                 TotalReceived:       receivedStatus.Bytes,
143                 AverageSentRate:     sentStatus.AvgRate,
144                 AverageReceivedRate: receivedStatus.AvgRate,
145                 CurrentSentRate:     sentStatus.CurRate,
146                 CurrentReceivedRate: receivedStatus.CurRate,
147         }
148 }
149
150 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
151         var relatedTxs []*types.Tx
152         var relatedStatuses []*bc.TxVerifyResult
153         for i, tx := range txs {
154                 if p.isRelatedTx(tx) {
155                         relatedTxs = append(relatedTxs, tx)
156                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
157                 }
158         }
159         return relatedTxs, relatedStatuses
160 }
161
162 func (p *peer) isRelatedTx(tx *types.Tx) bool {
163         for _, input := range tx.Inputs {
164                 switch inp := input.TypedInput.(type) {
165                 case *types.SpendInput:
166                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
167                                 return true
168                         }
169                 }
170         }
171         for _, output := range tx.Outputs {
172                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
173                         return true
174                 }
175         }
176         return false
177 }
178
179 func (p *peer) isSPVNode() bool {
180         return !p.services.IsEnable(consensus.SFFullNode)
181 }
182
183 func (p *peer) markBlock(hash *bc.Hash) {
184         p.mtx.Lock()
185         defer p.mtx.Unlock()
186
187         for p.knownBlocks.Size() >= maxKnownBlocks {
188                 p.knownBlocks.Pop()
189         }
190         p.knownBlocks.Add(hash.String())
191 }
192
193 func (p *peer) markTransaction(hash *bc.Hash) {
194         p.mtx.Lock()
195         defer p.mtx.Unlock()
196
197         for p.knownTxs.Size() >= maxKnownTxs {
198                 p.knownTxs.Pop()
199         }
200         p.knownTxs.Add(hash.String())
201 }
202
203 func (p *peer) sendBlock(block *types.Block) (bool, error) {
204         msg, err := NewBlockMessage(block)
205         if err != nil {
206                 return false, errors.Wrap(err, "fail on NewBlockMessage")
207         }
208
209         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
210         if ok {
211                 blcokHash := block.Hash()
212                 p.knownBlocks.Add(blcokHash.String())
213         }
214         return ok, nil
215 }
216
217 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
218         msg, err := NewBlocksMessage(blocks)
219         if err != nil {
220                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
221         }
222
223         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
224                 return ok, nil
225         }
226
227         for _, block := range blocks {
228                 blcokHash := block.Hash()
229                 p.knownBlocks.Add(blcokHash.String())
230         }
231         return true, nil
232 }
233
234 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
235         msg, err := NewHeadersMessage(headers)
236         if err != nil {
237                 return false, errors.New("fail on NewHeadersMessage")
238         }
239
240         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
241         return ok, nil
242 }
243
244 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
245         msg := NewMerkleBlockMessage()
246         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
247                 return false, err
248         }
249
250         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
251
252         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
253         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
254                 return false, nil
255         }
256
257         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
258         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
259                 return false, nil
260         }
261
262         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
263         return ok, nil
264 }
265
266 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
267         for _, tx := range txs {
268                 if p.isSPVNode() && !p.isRelatedTx(tx) {
269                         continue
270                 }
271                 msg, err := NewTransactionMessage(tx)
272                 if err != nil {
273                         return false, errors.Wrap(err, "failed to tx msg")
274                 }
275
276                 if p.knownTxs.Has(tx.ID.String()) {
277                         continue
278                 }
279                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
280                         return ok, nil
281                 }
282                 p.knownTxs.Add(tx.ID.String())
283         }
284         return true, nil
285 }
286
287 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
288         p.mtx.Lock()
289         defer p.mtx.Unlock()
290         p.height = height
291         p.hash = hash
292 }
293
294 type peerSet struct {
295         BasePeerSet
296         mtx   sync.RWMutex
297         peers map[string]*peer
298 }
299
300 // newPeerSet creates a new peer set to track the active participants.
301 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
302         return &peerSet{
303                 BasePeerSet: basePeerSet,
304                 peers:       make(map[string]*peer),
305         }
306 }
307
308 func (ps *peerSet) ProcessIllegal(peerID string, level byte, reason string) {
309         ps.mtx.Lock()
310         peer := ps.peers[peerID]
311         ps.mtx.Unlock()
312
313         if peer == nil {
314                 return
315         }
316         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
317                 ps.removePeer(peerID)
318         }
319         return
320 }
321
322 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
323         ps.mtx.Lock()
324         defer ps.mtx.Unlock()
325
326         if _, ok := ps.peers[peer.ID()]; !ok {
327                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
328                 return
329         }
330         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
331 }
332
333 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
334         ps.mtx.RLock()
335         defer ps.mtx.RUnlock()
336
337         var bestPeer *peer
338         for _, p := range ps.peers {
339                 if !p.services.IsEnable(flag) {
340                         continue
341                 }
342                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
343                         bestPeer = p
344                 }
345         }
346         return bestPeer
347 }
348
349 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
350         msg, err := NewMinedBlockMessage(block)
351         if err != nil {
352                 return errors.Wrap(err, "fail on broadcast mined block")
353         }
354
355         hash := block.Hash()
356         peers := ps.peersWithoutBlock(&hash)
357         for _, peer := range peers {
358                 if peer.isSPVNode() {
359                         continue
360                 }
361                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
362                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
363                         ps.removePeer(peer.ID())
364                         continue
365                 }
366                 peer.markBlock(&hash)
367         }
368         return nil
369 }
370
371 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
372         bestBlockHash := bestBlock.Hash()
373         peers := ps.peersWithoutBlock(&bestBlockHash)
374
375         genesisHash := genesisBlock.Hash()
376         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
377         for _, peer := range peers {
378                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
379                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
380                         ps.removePeer(peer.ID())
381                         continue
382                 }
383         }
384         return nil
385 }
386
387 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
388         msg, err := NewTransactionMessage(tx)
389         if err != nil {
390                 return errors.Wrap(err, "fail on broadcast tx")
391         }
392
393         peers := ps.peersWithoutTx(&tx.ID)
394         for _, peer := range peers {
395                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
396                         continue
397                 }
398                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
399                         log.WithFields(log.Fields{
400                                 "module":  logModule,
401                                 "peer":    peer.Addr(),
402                                 "type":    reflect.TypeOf(msg),
403                                 "message": msg.String(),
404                         }).Warning("send message to peer error")
405                         ps.removePeer(peer.ID())
406                         continue
407                 }
408                 peer.markTransaction(&tx.ID)
409         }
410         return nil
411 }
412
413 // Peer retrieves the registered peer with the given id.
414 func (ps *peerSet) getPeer(id string) *peer {
415         ps.mtx.RLock()
416         defer ps.mtx.RUnlock()
417         return ps.peers[id]
418 }
419
420 func (ps *peerSet) getPeerInfos() []*PeerInfo {
421         ps.mtx.RLock()
422         defer ps.mtx.RUnlock()
423
424         result := []*PeerInfo{}
425         for _, peer := range ps.peers {
426                 result = append(result, peer.getPeerInfo())
427         }
428         return result
429 }
430
431 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
432         ps.mtx.RLock()
433         defer ps.mtx.RUnlock()
434
435         peers := []*peer{}
436         for _, peer := range ps.peers {
437                 if !peer.knownBlocks.Has(hash.String()) {
438                         peers = append(peers, peer)
439                 }
440         }
441         return peers
442 }
443
444 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
445         ps.mtx.RLock()
446         defer ps.mtx.RUnlock()
447
448         peers := []*peer{}
449         for _, peer := range ps.peers {
450                 if !peer.knownTxs.Has(hash.String()) {
451                         peers = append(peers, peer)
452                 }
453         }
454         return peers
455 }
456
457 func (ps *peerSet) removePeer(peerID string) {
458         ps.mtx.Lock()
459         delete(ps.peers, peerID)
460         ps.mtx.Unlock()
461         ps.StopPeerGracefully(peerID)
462 }