OSDN Git Service

Add netsync node error handler (#1151)
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "net"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8         "gopkg.in/fatih/set.v0"
9
10         "github.com/bytom/consensus"
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p/trust"
13         "github.com/bytom/protocol/bc"
14         "github.com/bytom/protocol/bc/types"
15 )
16
17 const (
18         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
19         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
20         defaultBanThreshold = uint64(100)
21 )
22
23 //BasePeer is the interface for connection level peer
24 type BasePeer interface {
25         Addr() net.Addr
26         ID() string
27         ServiceFlag() consensus.ServiceFlag
28         TrySend(byte, interface{}) bool
29 }
30
31 //BasePeerSet is the intergace for connection level peer manager
32 type BasePeerSet interface {
33         AddBannedPeer(string) error
34         StopPeerGracefully(string)
35 }
36
37 // PeerInfo indicate peer status snap
38 type PeerInfo struct {
39         ID         string `json:"id"`
40         RemoteAddr string `json:"remote_addr"`
41         Height     uint64 `json:"height"`
42         Delay      uint32 `json:"delay"`
43 }
44
45 type peer struct {
46         BasePeer
47         mtx         sync.RWMutex
48         services    consensus.ServiceFlag
49         height      uint64
50         hash        *bc.Hash
51         banScore    trust.DynamicBanScore
52         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
53         knownBlocks *set.Set // Set of block hashes known to be known by this peer
54 }
55
56 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
57         return &peer{
58                 BasePeer:    basePeer,
59                 services:    basePeer.ServiceFlag(),
60                 height:      height,
61                 hash:        hash,
62                 knownTxs:    set.New(),
63                 knownBlocks: set.New(),
64         }
65 }
66
67 func (p *peer) Height() uint64 {
68         p.mtx.RLock()
69         defer p.mtx.RUnlock()
70         return p.height
71 }
72
73 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
74         score := p.banScore.Increase(persistent, transient)
75         if score > defaultBanThreshold {
76                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
77                 return true
78         }
79
80         warnThreshold := defaultBanThreshold >> 1
81         if score > warnThreshold {
82                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
83         }
84         return false
85 }
86
87 func (p *peer) getBlockByHeight(height uint64) bool {
88         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
89         return p.TrySend(BlockchainChannel, msg)
90 }
91
92 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
93         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
94         return p.TrySend(BlockchainChannel, msg)
95 }
96
97 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
98         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
99         return p.TrySend(BlockchainChannel, msg)
100 }
101
102 func (p *peer) getPeerInfo() *PeerInfo {
103         p.mtx.RLock()
104         defer p.mtx.RUnlock()
105         return &PeerInfo{
106                 ID:         p.ID(),
107                 RemoteAddr: p.Addr().String(),
108                 Height:     p.height,
109         }
110 }
111
112 func (p *peer) markBlock(hash *bc.Hash) {
113         p.mtx.Lock()
114         defer p.mtx.Unlock()
115
116         for p.knownBlocks.Size() >= maxKnownBlocks {
117                 p.knownBlocks.Pop()
118         }
119         p.knownBlocks.Add(hash.String())
120 }
121
122 func (p *peer) markTransaction(hash *bc.Hash) {
123         p.mtx.Lock()
124         defer p.mtx.Unlock()
125
126         for p.knownTxs.Size() >= maxKnownTxs {
127                 p.knownTxs.Pop()
128         }
129         p.knownTxs.Add(hash.String())
130 }
131
132 func (p *peer) sendBlock(block *types.Block) (bool, error) {
133         msg, err := NewBlockMessage(block)
134         if err != nil {
135                 return false, errors.Wrap(err, "fail on NewBlockMessage")
136         }
137
138         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
139         if ok {
140                 blcokHash := block.Hash()
141                 p.knownBlocks.Add(blcokHash.String())
142         }
143         return ok, nil
144 }
145
146 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
147         msg, err := NewBlocksMessage(blocks)
148         if err != nil {
149                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
150         }
151
152         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
153                 return ok, nil
154         }
155
156         for _, block := range blocks {
157                 blcokHash := block.Hash()
158                 p.knownBlocks.Add(blcokHash.String())
159         }
160         return true, nil
161 }
162
163 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
164         msg, err := NewHeadersMessage(headers)
165         if err != nil {
166                 return false, errors.New("fail on NewHeadersMessage")
167         }
168
169         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
170         return ok, nil
171 }
172
173 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
174         for _, tx := range txs {
175                 msg, err := NewTransactionMessage(tx)
176                 if err != nil {
177                         return false, errors.Wrap(err, "failed to tx msg")
178                 }
179
180                 if p.knownTxs.Has(tx.ID.String()) {
181                         continue
182                 }
183                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
184                         return ok, nil
185                 }
186                 p.knownTxs.Add(tx.ID.String())
187         }
188         return true, nil
189 }
190
191 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
192         p.mtx.Lock()
193         defer p.mtx.Unlock()
194         p.height = height
195         p.hash = hash
196 }
197
198 type peerSet struct {
199         BasePeerSet
200         mtx   sync.RWMutex
201         peers map[string]*peer
202 }
203
204 // newPeerSet creates a new peer set to track the active participants.
205 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
206         return &peerSet{
207                 BasePeerSet: basePeerSet,
208                 peers:       make(map[string]*peer),
209         }
210 }
211
212 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
213         ps.mtx.Lock()
214         peer := ps.peers[peerID]
215         ps.mtx.Unlock()
216
217         if peer == nil {
218                 return
219         }
220         if ban := peer.addBanScore(persistent, transient, reason); !ban {
221                 return
222         }
223         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
224                 log.WithField("err", err).Error("fail on add ban peer")
225         }
226         ps.removePeer(peerID)
227 }
228
229 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
230         ps.mtx.Lock()
231         defer ps.mtx.Unlock()
232
233         if _, ok := ps.peers[peer.ID()]; !ok {
234                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
235                 return
236         }
237         log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
238 }
239
240 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
241         ps.mtx.RLock()
242         defer ps.mtx.RUnlock()
243
244         var bestPeer *peer
245         for _, p := range ps.peers {
246                 if !p.services.IsEnable(flag) {
247                         continue
248                 }
249                 if bestPeer == nil || p.height > bestPeer.height {
250                         bestPeer = p
251                 }
252         }
253         return bestPeer
254 }
255
256 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
257         msg, err := NewMinedBlockMessage(block)
258         if err != nil {
259                 return errors.Wrap(err, "fail on broadcast mined block")
260         }
261
262         hash := block.Hash()
263         peers := ps.peersWithoutBlock(&hash)
264         for _, peer := range peers {
265                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
266                         ps.removePeer(peer.ID())
267                         continue
268                 }
269                 peer.markBlock(&hash)
270         }
271         return nil
272 }
273
274 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
275         msg, err := NewTransactionMessage(tx)
276         if err != nil {
277                 return errors.Wrap(err, "fail on broadcast tx")
278         }
279
280         peers := ps.peersWithoutTx(&tx.ID)
281         for _, peer := range peers {
282                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
283                         ps.removePeer(peer.ID())
284                         continue
285                 }
286                 peer.markTransaction(&tx.ID)
287         }
288         return nil
289 }
290
291 func (ps *peerSet) errorHandler(peerID string, err error) {
292         if errors.Root(err) == errPeerMisbehave {
293                 ps.addBanScore(peerID, 20, 0, err.Error())
294         } else {
295                 ps.removePeer(peerID)
296         }
297 }
298
299 // Peer retrieves the registered peer with the given id.
300 func (ps *peerSet) getPeer(id string) *peer {
301         ps.mtx.RLock()
302         defer ps.mtx.RUnlock()
303         return ps.peers[id]
304 }
305
306 func (ps *peerSet) getPeerInfos() []*PeerInfo {
307         ps.mtx.RLock()
308         defer ps.mtx.RUnlock()
309
310         result := []*PeerInfo{}
311         for _, peer := range ps.peers {
312                 result = append(result, peer.getPeerInfo())
313         }
314         return result
315 }
316
317 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
318         ps.mtx.RLock()
319         defer ps.mtx.RUnlock()
320
321         peers := []*peer{}
322         for _, peer := range ps.peers {
323                 if !peer.knownBlocks.Has(hash.String()) {
324                         peers = append(peers, peer)
325                 }
326         }
327         return peers
328 }
329
330 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
331         ps.mtx.RLock()
332         defer ps.mtx.RUnlock()
333
334         peers := []*peer{}
335         for _, peer := range ps.peers {
336                 if !peer.knownTxs.Has(hash.String()) {
337                         peers = append(peers, peer)
338                 }
339         }
340         return peers
341 }
342
343 func (ps *peerSet) removePeer(peerID string) {
344         ps.mtx.Lock()
345         delete(ps.peers, peerID)
346         ps.mtx.Unlock()
347         ps.StopPeerGracefully(peerID)
348 }