OSDN Git Service

fix conn close
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "math/rand"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         crypto "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14         dbm "github.com/tendermint/tmlibs/db"
15
16         "strings"
17
18         cfg "github.com/bytom/config"
19         "github.com/bytom/errors"
20         "github.com/bytom/p2p/trust"
21 )
22
23 const (
24         reconnectAttempts = 10
25         reconnectInterval = 10 * time.Second
26
27         bannedPeerKey      = "BannedPeer"
28         defaultBanDuration = time.Hour * 24
29         peerBannedTM       = 20
30 )
31
32 var ErrConnectBannedPeer = errors.New("Connect banned peer")
33
34 type Reactor interface {
35         cmn.Service // Start, Stop
36
37         SetSwitch(*Switch)
38         GetChannels() []*ChannelDescriptor
39         AddPeer(peer *Peer) error
40         RemovePeer(peer *Peer, reason interface{})
41         Receive(chID byte, peer *Peer, msgBytes []byte)
42 }
43
44 //--------------------------------------
45
46 type BaseReactor struct {
47         cmn.BaseService // Provides Start, Stop, .Quit
48         Switch          *Switch
49 }
50
51 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
52         return &BaseReactor{
53                 BaseService: *cmn.NewBaseService(nil, name, impl),
54                 Switch:      nil,
55         }
56 }
57
58 func (br *BaseReactor) SetSwitch(sw *Switch) {
59         br.Switch = sw
60 }
61 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
62 func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
63 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
64 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
65
66 //-----------------------------------------------------------------------------
67
68 /*
69 The `Switch` handles peer connections and exposes an API to receive incoming messages
70 on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
71 or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
72 incoming messages are received on the reactor.
73 */
74 type Switch struct {
75         cmn.BaseService
76
77         config           *cfg.P2PConfig
78         peerConfig       *PeerConfig
79         listeners        []Listener
80         reactors         map[string]Reactor
81         chDescs          []*ChannelDescriptor
82         reactorsByCh     map[byte]Reactor
83         peers            *PeerSet
84         dialing          *cmn.CMap
85         nodeInfo         *NodeInfo             // our node info
86         nodePrivKey      crypto.PrivKeyEd25519 // our node privkey
87         bannedPeer       map[string]time.Time
88         db               dbm.DB
89         TrustMetricStore *trust.TrustMetricStore
90         ScamPeerCh       chan *Peer
91         mtx              sync.Mutex
92
93         filterConnByAddr   func(net.Addr) error
94         filterConnByPubKey func(crypto.PubKeyEd25519) error
95 }
96
97 var (
98         ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
99         ErrConnectSelf         = errors.New("Connect self")
100         ErrPeerConnected       = errors.New("Peer is connected")
101 )
102
103 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
104         sw := &Switch{
105                 config:       config,
106                 peerConfig:   DefaultPeerConfig(config),
107                 reactors:     make(map[string]Reactor),
108                 chDescs:      make([]*ChannelDescriptor, 0),
109                 reactorsByCh: make(map[byte]Reactor),
110                 peers:        NewPeerSet(),
111                 dialing:      cmn.NewCMap(),
112                 nodeInfo:     nil,
113                 db:           trustHistoryDB,
114                 ScamPeerCh:   make(chan *Peer),
115         }
116         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
117         sw.TrustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig())
118         sw.TrustMetricStore.Start()
119
120         sw.bannedPeer = make(map[string]time.Time)
121         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
122                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
123                         return nil
124                 }
125         }
126         go sw.scamPeerHandler()
127         return sw
128 }
129
130 // Not goroutine safe.
131 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
132         // Validate the reactor.
133         // No two reactors can share the same channel.
134         reactorChannels := reactor.GetChannels()
135         for _, chDesc := range reactorChannels {
136                 chID := chDesc.ID
137                 if sw.reactorsByCh[chID] != nil {
138                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
139                 }
140                 sw.chDescs = append(sw.chDescs, chDesc)
141                 sw.reactorsByCh[chID] = reactor
142         }
143         sw.reactors[name] = reactor
144         reactor.SetSwitch(sw)
145         return reactor
146 }
147
148 // Not goroutine safe.
149 func (sw *Switch) Reactors() map[string]Reactor {
150         return sw.reactors
151 }
152
153 // Not goroutine safe.
154 func (sw *Switch) Reactor(name string) Reactor {
155         return sw.reactors[name]
156 }
157
158 // Not goroutine safe.
159 func (sw *Switch) AddListener(l Listener) {
160         sw.listeners = append(sw.listeners, l)
161 }
162
163 // Not goroutine safe.
164 func (sw *Switch) Listeners() []Listener {
165         return sw.listeners
166 }
167
168 // Not goroutine safe.
169 func (sw *Switch) IsListening() bool {
170         return len(sw.listeners) > 0
171 }
172
173 // Not goroutine safe.
174 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
175         sw.nodeInfo = nodeInfo
176 }
177
178 // Not goroutine safe.
179 func (sw *Switch) NodeInfo() *NodeInfo {
180         return sw.nodeInfo
181 }
182
183 // Not goroutine safe.
184 // NOTE: Overwrites sw.nodeInfo.PubKey
185 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
186         sw.nodePrivKey = nodePrivKey
187         if sw.nodeInfo != nil {
188                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
189         }
190 }
191
192 // Switch.Start() starts all the reactors, peers, and listeners.
193 func (sw *Switch) OnStart() error {
194         sw.BaseService.OnStart()
195         // Start reactors
196         for _, reactor := range sw.reactors {
197                 _, err := reactor.Start()
198                 if err != nil {
199                         return err
200                 }
201         }
202         // Start peers
203         for _, peer := range sw.peers.List() {
204                 sw.startInitPeer(peer)
205         }
206         // Start listeners
207         for _, listener := range sw.listeners {
208                 go sw.listenerRoutine(listener)
209         }
210         return nil
211 }
212
213 func (sw *Switch) OnStop() {
214         sw.BaseService.OnStop()
215         // Stop listeners
216         for _, listener := range sw.listeners {
217                 listener.Stop()
218         }
219         sw.listeners = nil
220         // Stop peers
221         for _, peer := range sw.peers.List() {
222                 peer.Stop()
223                 sw.peers.Remove(peer)
224         }
225         // Stop reactors
226         for _, reactor := range sw.reactors {
227                 reactor.Stop()
228         }
229 }
230
231 // NOTE: This performs a blocking handshake before the peer is added.
232 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
233 func (sw *Switch) AddPeer(peer *Peer) error {
234         if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
235                 return err
236         }
237
238         if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
239                 return err
240         }
241
242         if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
243                 return err
244         }
245
246         if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
247                 return err
248         }
249
250         // Avoid self
251         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
252                 return errors.New("Ignoring connection from self")
253         }
254
255         // Check version, chain id
256         if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
257                 return err
258         }
259
260         // Check for duplicate peer
261         if sw.peers.Has(peer.Key) {
262                 return ErrSwitchDuplicatePeer
263
264         }
265
266         // Start peer
267         if sw.IsRunning() {
268                 if err := sw.startInitPeer(peer); err != nil {
269                         return err
270                 }
271         }
272
273         // Add the peer to .peers.
274         // We start it first so that a peer in the list is safe to Stop.
275         // It should not err since we already checked peers.Has()
276         if err := sw.peers.Add(peer); err != nil {
277                 return err
278         }
279
280         tm := trust.NewMetric()
281
282         tm.Start()
283         sw.TrustMetricStore.AddPeerTrustMetric(peer.mconn.RemoteAddress.IP.String(), tm)
284
285         log.WithField("peer", peer).Info("Added peer")
286         return nil
287 }
288
289 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
290         if sw.filterConnByAddr != nil {
291                 return sw.filterConnByAddr(addr)
292         }
293         return nil
294 }
295
296 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
297         if sw.filterConnByPubKey != nil {
298                 return sw.filterConnByPubKey(pubkey)
299         }
300         return nil
301
302 }
303
304 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
305         sw.filterConnByAddr = f
306 }
307
308 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
309         sw.filterConnByPubKey = f
310 }
311
312 func (sw *Switch) startInitPeer(peer *Peer) error {
313         peer.Start() // spawn send/recv routines
314         for _, reactor := range sw.reactors {
315                 if err := reactor.AddPeer(peer); err != nil {
316                         return err
317                 }
318         }
319         return nil
320 }
321
322 // Dial a list of seeds asynchronously in random order
323 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
324
325         netAddrs, err := NewNetAddressStrings(seeds)
326         if err != nil {
327                 return err
328         }
329
330         if addrBook != nil {
331                 // add seeds to `addrBook`
332                 ourAddrS := sw.nodeInfo.ListenAddr
333                 ourAddr, _ := NewNetAddressString(ourAddrS)
334                 for _, netAddr := range netAddrs {
335                         // do not add ourselves
336                         if netAddr.Equals(ourAddr) {
337                                 continue
338                         }
339                         addrBook.AddAddress(netAddr, ourAddr)
340                 }
341                 addrBook.Save()
342         }
343
344         // permute the list, dial them in random order.
345         perm := rand.Perm(len(netAddrs))
346         for i := 0; i < len(perm); i++ {
347                 go func(i int) {
348                         time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
349                         j := perm[i]
350                         sw.dialSeed(netAddrs[j])
351                 }(i)
352         }
353         return nil
354 }
355
356 func (sw *Switch) dialSeed(addr *NetAddress) {
357         peer, err := sw.DialPeerWithAddress(addr, true)
358         if err != nil {
359                 log.WithField("error", err).Error("Error dialing seed")
360         } else {
361                 log.WithField("peer", peer).Info("Connected to seed")
362         }
363 }
364
365 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
366         if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
367                 return nil, err
368         }
369         if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
370                 return nil, ErrConnectSelf
371         }
372         for _, v := range sw.Peers().list {
373                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
374                         return nil, ErrPeerConnected
375                 }
376         }
377         sw.dialing.Set(addr.IP.String(), addr)
378         defer sw.dialing.Delete(addr.IP.String())
379
380         log.WithField("address", addr).Info("Dialing peer")
381         peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
382         if err != nil {
383                 log.WithFields(log.Fields{
384                         "address": addr,
385                         "error":   err,
386                 }).Info("Failed to dial peer")
387                 return nil, err
388         }
389         peer.SetLogger(sw.Logger.With("peer", addr))
390         if persistent {
391                 peer.makePersistent()
392         }
393         err = sw.AddPeer(peer)
394         if err != nil {
395                 log.WithFields(log.Fields{
396                         "address": addr,
397                         "error":   err,
398                 }).Info("Failed to add peer")
399                 peer.CloseConn()
400                 return nil, err
401         }
402         log.WithFields(log.Fields{
403                 "address": addr,
404         }).Info("Dialed and added peer")
405         return peer, nil
406 }
407
408 func (sw *Switch) IsDialing(addr *NetAddress) bool {
409         return sw.dialing.Has(addr.IP.String())
410 }
411
412 // Broadcast runs a go routine for each attempted send, which will block
413 // trying to send for defaultSendTimeoutSeconds. Returns a channel
414 // which receives success values for each attempted send (false if times out)
415 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
416 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
417         successChan := make(chan bool, len(sw.peers.List()))
418         log.WithFields(log.Fields{
419                 "chID": chID,
420                 "msg":  msg,
421         }).Debug("Broadcast")
422         for _, peer := range sw.peers.List() {
423                 go func(peer *Peer) {
424                         success := peer.Send(chID, msg)
425                         successChan <- success
426                 }(peer)
427         }
428         return successChan
429 }
430
431 // Returns the count of outbound/inbound and outbound-dialing peers.
432 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
433         peers := sw.peers.List()
434         for _, peer := range peers {
435                 if peer.outbound {
436                         outbound++
437                 } else {
438                         inbound++
439                 }
440         }
441         dialing = sw.dialing.Size()
442         return
443 }
444
445 func (sw *Switch) Peers() *PeerSet {
446         return sw.peers
447 }
448
449 // Disconnect from a peer due to external error, retry if it is a persistent peer.
450 // TODO: make record depending on reason.
451 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
452         addr := NewNetAddress(peer.Addr())
453         log.WithFields(log.Fields{
454                 "peer":  peer,
455                 "error": reason,
456         }).Info("Stopping peer due to error")
457         sw.stopAndRemovePeer(peer, reason)
458
459         if peer.IsPersistent() {
460                 log.WithField("peer", peer).Info("Reconnecting to peer")
461                 for i := 1; i < reconnectAttempts; i++ {
462                         if !sw.IsRunning() {
463                                 return
464                         }
465
466                         peer, err := sw.DialPeerWithAddress(addr, true)
467                         if err != nil {
468                                 if i == reconnectAttempts {
469                                         log.WithFields(log.Fields{
470                                                 "retries": i,
471                                                 "error":   err,
472                                         }).Info("Error reconnecting to peer. Giving up")
473                                         return
474                                 }
475
476                                 if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
477                                         log.WithField("error", err).Info("Error reconnecting to peer. ")
478                                         return
479                                 }
480
481                                 log.WithFields(log.Fields{
482                                         "retries": i,
483                                         "error":   err,
484                                 }).Info("Error reconnecting to peer. Trying again")
485                                 time.Sleep(reconnectInterval)
486                                 continue
487                         }
488
489                         log.WithField("peer", peer).Info("Reconnected to peer")
490                         return
491                 }
492         }
493 }
494
495 // Disconnect from a peer gracefully.
496 // TODO: handle graceful disconnects.
497 func (sw *Switch) StopPeerGracefully(peer *Peer) {
498         log.Info("Stopping peer gracefully")
499         sw.stopAndRemovePeer(peer, nil)
500 }
501
502 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
503         sw.peers.Remove(peer)
504         peer.Stop()
505         for _, reactor := range sw.reactors {
506                 reactor.RemovePeer(peer, reason)
507         }
508 }
509
510 func (sw *Switch) listenerRoutine(l Listener) {
511         for {
512                 inConn, ok := <-l.Connections()
513                 if !ok {
514                         break
515                 }
516
517                 // ignore connection if we already have enough
518                 maxPeers := sw.config.MaxNumPeers
519                 if maxPeers <= sw.peers.Size() {
520                         // close inConn
521                         inConn.Close()
522                         log.WithFields(log.Fields{
523                                 "address":  inConn.RemoteAddr().String(),
524                                 "numPeers": sw.peers.Size(),
525                                 "max":      maxPeers,
526                         }).Info("Ignoring inbound connection: already have enough peers")
527                         continue
528                 }
529
530                 // New inbound connection!
531                 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
532                 if err != nil {
533                         // conn close for returing err
534                         inConn.Close()
535                         log.WithFields(log.Fields{
536                                 "address": inConn.RemoteAddr().String(),
537                                 "error":   err,
538                         }).Info("Ignoring inbound connection: error while adding peer")
539                         continue
540                 }
541
542                 // NOTE: We don't yet have the listening port of the
543                 // remote (if they have a listener at all).
544                 // The peerHandshake will handle that
545         }
546
547         // cleanup
548 }
549
550 //-----------------------------------------------------------------------------
551
552 type SwitchEventNewPeer struct {
553         Peer *Peer
554 }
555
556 type SwitchEventDonePeer struct {
557         Peer  *Peer
558         Error interface{}
559 }
560
561 //------------------------------------------------------------------
562 // Switches connected via arbitrary net.Conn; useful for testing
563
564 // Returns n switches, connected according to the connect func.
565 // If connect==Connect2Switches, the switches will be fully connected.
566 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
567 // NOTE: panics if any switch fails to start.
568 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
569         switches := make([]*Switch, n)
570         for i := 0; i < n; i++ {
571                 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
572         }
573
574         if err := StartSwitches(switches); err != nil {
575                 panic(err)
576         }
577
578         for i := 0; i < n; i++ {
579                 for j := i; j < n; j++ {
580                         connect(switches, i, j)
581                 }
582         }
583
584         return switches
585 }
586
587 var PanicOnAddPeerErr = false
588
589 // Will connect switches i and j via net.Pipe()
590 // Blocks until a conection is established.
591 // NOTE: caller ensures i and j are within bounds
592 func Connect2Switches(switches []*Switch, i, j int) {
593         switchI := switches[i]
594         switchJ := switches[j]
595         c1, c2 := net.Pipe()
596         doneCh := make(chan struct{})
597         go func() {
598                 err := switchI.addPeerWithConnection(c1)
599                 if PanicOnAddPeerErr && err != nil {
600                         panic(err)
601                 }
602                 doneCh <- struct{}{}
603         }()
604         go func() {
605                 err := switchJ.addPeerWithConnection(c2)
606                 if PanicOnAddPeerErr && err != nil {
607                         panic(err)
608                 }
609                 doneCh <- struct{}{}
610         }()
611         <-doneCh
612         <-doneCh
613 }
614
615 func StartSwitches(switches []*Switch) error {
616         for _, s := range switches {
617                 _, err := s.Start() // start switch and reactors
618                 if err != nil {
619                         return err
620                 }
621         }
622         return nil
623 }
624
625 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
626         privKey := crypto.GenPrivKeyEd25519()
627         // new switch, add reactors
628         // TODO: let the config be passed in?
629         s := initSwitch(i, NewSwitch(cfg, nil))
630         s.SetNodeInfo(&NodeInfo{
631                 PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
632                 Moniker:    cmn.Fmt("switch%d", i),
633                 Network:    network,
634                 Version:    version,
635                 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
636                 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
637         })
638         s.SetNodePrivKey(privKey)
639         return s
640 }
641
642 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
643         peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
644         if err != nil {
645                 conn.Close()
646                 return err
647         }
648         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
649         if err = sw.AddPeer(peer); err != nil {
650                 conn.Close()
651                 return err
652         }
653
654         return nil
655 }
656
657 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
658         fullAddr := conn.RemoteAddr().String()
659         host, _, err := net.SplitHostPort(fullAddr)
660         if err != nil {
661                 return err
662         }
663
664         if err = sw.checkBannedPeer(host); err != nil {
665                 return err
666         }
667
668         peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
669         if err != nil {
670                 return err
671         }
672         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
673         if err = sw.AddPeer(peer); err != nil {
674                 return err
675         }
676
677         return nil
678 }
679
680 func (sw *Switch) AddBannedPeer(peer *Peer) error {
681         sw.mtx.Lock()
682         defer sw.mtx.Unlock()
683
684         key := peer.mconn.RemoteAddress.IP.String()
685         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
686         datajson, err := json.Marshal(sw.bannedPeer)
687         if err != nil {
688                 return err
689         }
690         sw.db.Set([]byte(bannedPeerKey), datajson)
691         return nil
692 }
693
694 func (sw *Switch) DelBannedPeer(addr string) error {
695         sw.mtx.Lock()
696         defer sw.mtx.Unlock()
697
698         delete(sw.bannedPeer, addr)
699         datajson, err := json.Marshal(sw.bannedPeer)
700         if err != nil {
701                 return err
702         }
703         sw.db.Set([]byte(bannedPeerKey), datajson)
704         return nil
705 }
706
707 func (sw *Switch) scamPeerHandler() {
708         for src := range sw.ScamPeerCh {
709                 var tm *trust.TrustMetric
710                 key := src.Connection().RemoteAddress.IP.String()
711                 if tm = sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
712                         log.Errorf("Can't get peer trust metric")
713                         continue
714                 }
715                 sw.delTrustMetric(tm, src)
716         }
717 }
718
719 func (sw *Switch) AddScamPeer(src *Peer) {
720         sw.ScamPeerCh <- src
721 }
722
723 func (sw *Switch) delTrustMetric(tm *trust.TrustMetric, src *Peer) {
724         key := src.Connection().RemoteAddress.IP.String()
725         tm.BadEvents(1)
726         if tm.TrustScore() < peerBannedTM {
727                 sw.AddBannedPeer(src)
728                 sw.TrustMetricStore.PeerDisconnected(key)
729                 sw.StopPeerGracefully(src)
730         }
731 }
732
733 func (sw *Switch) checkBannedPeer(peer string) error {
734         if banEnd, ok := sw.bannedPeer[peer]; ok {
735                 if time.Now().Before(banEnd) {
736                         return ErrConnectBannedPeer
737                 }
738                 sw.DelBannedPeer(peer)
739         }
740         return nil
741 }