OSDN Git Service

Merge pull request #936 from Bytom/prod
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "math/rand"
7         "net"
8         "strings"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         crypto "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15         dbm "github.com/tendermint/tmlibs/db"
16
17         cfg "github.com/bytom/config"
18         "github.com/bytom/errors"
19         "github.com/bytom/p2p/trust"
20 )
21
22 const (
23         reconnectAttempts = 5
24         reconnectInterval = 10 * time.Second
25
26         bannedPeerKey      = "BannedPeer"
27         defaultBanDuration = time.Hour * 1
28 )
29
30 var ErrConnectBannedPeer = errors.New("Connect banned peer")
31
32 type Reactor interface {
33         cmn.Service // Start, Stop
34
35         SetSwitch(*Switch)
36         GetChannels() []*ChannelDescriptor
37         AddPeer(peer *Peer) error
38         RemovePeer(peer *Peer, reason interface{})
39         Receive(chID byte, peer *Peer, msgBytes []byte)
40 }
41
42 //--------------------------------------
43
44 type BaseReactor struct {
45         cmn.BaseService // Provides Start, Stop, .Quit
46         Switch          *Switch
47 }
48
49 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
50         return &BaseReactor{
51                 BaseService: *cmn.NewBaseService(nil, name, impl),
52                 Switch:      nil,
53         }
54 }
55
56 func (br *BaseReactor) SetSwitch(sw *Switch) {
57         br.Switch = sw
58 }
59 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
60 func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
61 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
62 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
63
64 //-----------------------------------------------------------------------------
65
66 /*
67 The `Switch` handles peer connections and exposes an API to receive incoming messages
68 on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
69 or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
70 incoming messages are received on the reactor.
71 */
72 type Switch struct {
73         cmn.BaseService
74
75         config       *cfg.P2PConfig
76         peerConfig   *PeerConfig
77         listeners    []Listener
78         reactors     map[string]Reactor
79         chDescs      []*ChannelDescriptor
80         reactorsByCh map[byte]Reactor
81         peers        *PeerSet
82         dialing      *cmn.CMap
83         nodeInfo     *NodeInfo             // our node info
84         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
85         bannedPeer   map[string]time.Time
86         db           dbm.DB
87         mtx          sync.Mutex
88
89         filterConnByAddr   func(net.Addr) error
90         filterConnByPubKey func(crypto.PubKeyEd25519) error
91 }
92
93 var (
94         ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
95         ErrConnectSelf         = errors.New("Connect self")
96         ErrPeerConnected       = errors.New("Peer is connected")
97 )
98
99 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
100         sw := &Switch{
101                 config:       config,
102                 peerConfig:   DefaultPeerConfig(config),
103                 reactors:     make(map[string]Reactor),
104                 chDescs:      make([]*ChannelDescriptor, 0),
105                 reactorsByCh: make(map[byte]Reactor),
106                 peers:        NewPeerSet(),
107                 dialing:      cmn.NewCMap(),
108                 nodeInfo:     nil,
109                 db:           trustHistoryDB,
110         }
111         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
112
113         sw.bannedPeer = make(map[string]time.Time)
114         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
115                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
116                         return nil
117                 }
118         }
119         trust.Init()
120         return sw
121 }
122
123 // Not goroutine safe.
124 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
125         // Validate the reactor.
126         // No two reactors can share the same channel.
127         reactorChannels := reactor.GetChannels()
128         for _, chDesc := range reactorChannels {
129                 chID := chDesc.ID
130                 if sw.reactorsByCh[chID] != nil {
131                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
132                 }
133                 sw.chDescs = append(sw.chDescs, chDesc)
134                 sw.reactorsByCh[chID] = reactor
135         }
136         sw.reactors[name] = reactor
137         reactor.SetSwitch(sw)
138         return reactor
139 }
140
141 // Not goroutine safe.
142 func (sw *Switch) Reactors() map[string]Reactor {
143         return sw.reactors
144 }
145
146 // Not goroutine safe.
147 func (sw *Switch) Reactor(name string) Reactor {
148         return sw.reactors[name]
149 }
150
151 // Not goroutine safe.
152 func (sw *Switch) AddListener(l Listener) {
153         sw.listeners = append(sw.listeners, l)
154 }
155
156 // Not goroutine safe.
157 func (sw *Switch) Listeners() []Listener {
158         return sw.listeners
159 }
160
161 // Not goroutine safe.
162 func (sw *Switch) IsListening() bool {
163         return len(sw.listeners) > 0
164 }
165
166 // Not goroutine safe.
167 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
168         sw.nodeInfo = nodeInfo
169 }
170
171 // Not goroutine safe.
172 func (sw *Switch) NodeInfo() *NodeInfo {
173         return sw.nodeInfo
174 }
175
176 // Not goroutine safe.
177 // NOTE: Overwrites sw.nodeInfo.PubKey
178 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
179         sw.nodePrivKey = nodePrivKey
180         if sw.nodeInfo != nil {
181                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
182         }
183 }
184
185 // Switch.Start() starts all the reactors, peers, and listeners.
186 func (sw *Switch) OnStart() error {
187         sw.BaseService.OnStart()
188         // Start reactors
189         for _, reactor := range sw.reactors {
190                 _, err := reactor.Start()
191                 if err != nil {
192                         return err
193                 }
194         }
195         // Start peers
196         for _, peer := range sw.peers.List() {
197                 sw.startInitPeer(peer)
198         }
199         // Start listeners
200         for _, listener := range sw.listeners {
201                 go sw.listenerRoutine(listener)
202         }
203         return nil
204 }
205
206 func (sw *Switch) OnStop() {
207         sw.BaseService.OnStop()
208         // Stop listeners
209         for _, listener := range sw.listeners {
210                 listener.Stop()
211         }
212         sw.listeners = nil
213         // Stop peers
214         for _, peer := range sw.peers.List() {
215                 peer.Stop()
216                 sw.peers.Remove(peer)
217         }
218         // Stop reactors
219         for _, reactor := range sw.reactors {
220                 reactor.Stop()
221         }
222 }
223
224 // NOTE: This performs a blocking handshake before the peer is added.
225 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
226 func (sw *Switch) AddPeer(peer *Peer) error {
227         if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
228                 return err
229         }
230
231         if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
232                 return err
233         }
234
235         if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
236                 return err
237         }
238
239         if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
240                 return err
241         }
242
243         // Avoid self
244         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
245                 return errors.New("Ignoring connection from self")
246         }
247
248         // Check version, chain id
249         if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
250                 return err
251         }
252
253         // Check for duplicate peer
254         if sw.peers.Has(peer.Key) {
255                 return ErrSwitchDuplicatePeer
256
257         }
258
259         // Start peer
260         if sw.IsRunning() {
261                 if err := sw.startInitPeer(peer); err != nil {
262                         return err
263                 }
264         }
265
266         // Add the peer to .peers.
267         // We start it first so that a peer in the list is safe to Stop.
268         // It should not err since we already checked peers.Has()
269         if err := sw.peers.Add(peer); err != nil {
270                 return err
271         }
272
273         log.WithField("peer", peer).Info("Added peer")
274         return nil
275 }
276
277 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
278         if sw.filterConnByAddr != nil {
279                 return sw.filterConnByAddr(addr)
280         }
281         return nil
282 }
283
284 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
285         if sw.filterConnByPubKey != nil {
286                 return sw.filterConnByPubKey(pubkey)
287         }
288         return nil
289
290 }
291
292 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
293         sw.filterConnByAddr = f
294 }
295
296 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
297         sw.filterConnByPubKey = f
298 }
299
300 func (sw *Switch) startInitPeer(peer *Peer) error {
301         peer.Start() // spawn send/recv routines
302         for _, reactor := range sw.reactors {
303                 if err := reactor.AddPeer(peer); err != nil {
304                         return err
305                 }
306         }
307         return nil
308 }
309
310 // Dial a list of seeds asynchronously in random order
311 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
312
313         netAddrs, err := NewNetAddressStrings(seeds)
314         if err != nil {
315                 return err
316         }
317
318         if addrBook != nil {
319                 // add seeds to `addrBook`
320                 ourAddrS := sw.nodeInfo.ListenAddr
321                 ourAddr, _ := NewNetAddressString(ourAddrS)
322                 for _, netAddr := range netAddrs {
323                         // do not add ourselves
324                         if netAddr.Equals(ourAddr) {
325                                 continue
326                         }
327                         addrBook.AddAddress(netAddr, ourAddr)
328                 }
329
330                 addrBook.Save()
331         }
332         //permute the list, dial them in random order.
333         perm := rand.Perm(len(netAddrs))
334         for i := 0; i < len(perm); i += 2 {
335                 j := perm[i]
336                 sw.dialSeed(netAddrs[j])
337         }
338
339         return nil
340 }
341
342 func (sw *Switch) dialSeed(addr *NetAddress) {
343         peer, err := sw.DialPeerWithAddress(addr, false)
344         if err != nil {
345                 log.WithField("error", err).Error("Error dialing seed")
346         } else {
347                 log.WithField("peer", peer).Info("Connected to seed")
348         }
349 }
350
351 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
352         if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
353                 return nil, err
354         }
355         if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
356                 return nil, ErrConnectSelf
357         }
358         for _, v := range sw.Peers().list {
359                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
360                         return nil, ErrPeerConnected
361                 }
362         }
363         sw.dialing.Set(addr.IP.String(), addr)
364         defer sw.dialing.Delete(addr.IP.String())
365
366         log.Debug("Dialing peer address:", addr)
367         peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
368         if err != nil {
369                 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
370                 return nil, err
371         }
372         peer.SetLogger(sw.Logger.With("peer", addr))
373         if persistent {
374                 peer.makePersistent()
375         }
376         err = sw.AddPeer(peer)
377         if err != nil {
378                 log.WithFields(log.Fields{
379                         "address": addr,
380                         "error":   err,
381                 }).Info("Failed to add peer")
382                 peer.CloseConn()
383                 return nil, err
384         }
385         log.WithFields(log.Fields{
386                 "address": addr,
387         }).Info("Dialed and added peer")
388         return peer, nil
389 }
390
391 func (sw *Switch) IsDialing(addr *NetAddress) bool {
392         return sw.dialing.Has(addr.IP.String())
393 }
394
395 // Broadcast runs a go routine for each attempted send, which will block
396 // trying to send for defaultSendTimeoutSeconds. Returns a channel
397 // which receives success values for each attempted send (false if times out)
398 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
399 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
400         successChan := make(chan bool, len(sw.peers.List()))
401         log.WithFields(log.Fields{
402                 "chID": chID,
403                 "msg":  msg,
404         }).Debug("Broadcast")
405         for _, peer := range sw.peers.List() {
406                 go func(peer *Peer) {
407                         success := peer.Send(chID, msg)
408                         successChan <- success
409                 }(peer)
410         }
411         return successChan
412 }
413
414 // Returns the count of outbound/inbound and outbound-dialing peers.
415 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
416         peers := sw.peers.List()
417         for _, peer := range peers {
418                 if peer.outbound {
419                         outbound++
420                 } else {
421                         inbound++
422                 }
423         }
424         dialing = sw.dialing.Size()
425         return
426 }
427
428 func (sw *Switch) Peers() *PeerSet {
429         return sw.peers
430 }
431
432 // Disconnect from a peer due to external error, retry if it is a persistent peer.
433 // TODO: make record depending on reason.
434 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
435         addr := NewNetAddress(peer.Addr())
436         log.WithFields(log.Fields{
437                 "peer":  peer,
438                 "error": reason,
439         }).Info("Stopping peer due to error")
440         sw.stopAndRemovePeer(peer, reason)
441
442         if peer.IsPersistent() {
443                 log.WithField("peer", peer).Info("Reconnecting to peer")
444                 for i := 1; i < reconnectAttempts; i++ {
445                         if !sw.IsRunning() {
446                                 return
447                         }
448
449                         peer, err := sw.DialPeerWithAddress(addr, false)
450                         if err != nil {
451                                 if i == reconnectAttempts {
452                                         log.WithFields(log.Fields{
453                                                 "retries": i,
454                                                 "error":   err,
455                                         }).Info("Error reconnecting to peer. Giving up")
456                                         return
457                                 }
458
459                                 if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
460                                         log.WithField("error", err).Info("Error reconnecting to peer. ")
461                                         return
462                                 }
463
464                                 log.WithFields(log.Fields{
465                                         "retries": i,
466                                         "error":   err,
467                                 }).Info("Error reconnecting to peer. Trying again")
468                                 time.Sleep(reconnectInterval)
469                                 continue
470                         }
471
472                         log.WithField("peer", peer).Info("Reconnected to peer")
473                         return
474                 }
475         }
476 }
477
478 // Disconnect from a peer gracefully.
479 // TODO: handle graceful disconnects.
480 func (sw *Switch) StopPeerGracefully(peer *Peer) {
481         log.Info("Stopping peer gracefully")
482         sw.stopAndRemovePeer(peer, nil)
483 }
484
485 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
486         for _, reactor := range sw.reactors {
487                 reactor.RemovePeer(peer, reason)
488         }
489         sw.peers.Remove(peer)
490         log.Info("Del peer from switch.")
491         peer.Stop()
492         log.Info("Peer connection is closed.")
493 }
494
495 func (sw *Switch) listenerRoutine(l Listener) {
496         for {
497                 inConn, ok := <-l.Connections()
498                 if !ok {
499                         break
500                 }
501
502                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
503                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
504                 // be double of MaxNumPeers
505                 if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
506                         // close inConn
507                         inConn.Close()
508                         log.WithFields(log.Fields{
509                                 "address":  inConn.RemoteAddr().String(),
510                                 "numPeers": sw.peers.Size(),
511                         }).Info("Ignoring inbound connection: already have enough peers")
512                         continue
513                 }
514
515                 // New inbound connection!
516                 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
517                 if err != nil {
518                         // conn close for returing err
519                         inConn.Close()
520                         log.WithFields(log.Fields{
521                                 "address": inConn.RemoteAddr().String(),
522                                 "error":   err,
523                         }).Info("Ignoring inbound connection: error while adding peer")
524                         continue
525                 }
526
527                 // NOTE: We don't yet have the listening port of the
528                 // remote (if they have a listener at all).
529                 // The peerHandshake will handle that
530         }
531
532         // cleanup
533 }
534
535 //-----------------------------------------------------------------------------
536
537 type SwitchEventNewPeer struct {
538         Peer *Peer
539 }
540
541 type SwitchEventDonePeer struct {
542         Peer  *Peer
543         Error interface{}
544 }
545
546 //------------------------------------------------------------------
547 // Switches connected via arbitrary net.Conn; useful for testing
548
549 // Returns n switches, connected according to the connect func.
550 // If connect==Connect2Switches, the switches will be fully connected.
551 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
552 // NOTE: panics if any switch fails to start.
553 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
554         switches := make([]*Switch, n)
555         for i := 0; i < n; i++ {
556                 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
557         }
558
559         if err := StartSwitches(switches); err != nil {
560                 panic(err)
561         }
562
563         for i := 0; i < n; i++ {
564                 for j := i; j < n; j++ {
565                         connect(switches, i, j)
566                 }
567         }
568
569         return switches
570 }
571
572 var PanicOnAddPeerErr = false
573
574 // Will connect switches i and j via net.Pipe()
575 // Blocks until a conection is established.
576 // NOTE: caller ensures i and j are within bounds
577 func Connect2Switches(switches []*Switch, i, j int) {
578         switchI := switches[i]
579         switchJ := switches[j]
580         c1, c2 := net.Pipe()
581         doneCh := make(chan struct{})
582         go func() {
583                 err := switchI.addPeerWithConnection(c1)
584                 if PanicOnAddPeerErr && err != nil {
585                         panic(err)
586                 }
587                 doneCh <- struct{}{}
588         }()
589         go func() {
590                 err := switchJ.addPeerWithConnection(c2)
591                 if PanicOnAddPeerErr && err != nil {
592                         panic(err)
593                 }
594                 doneCh <- struct{}{}
595         }()
596         <-doneCh
597         <-doneCh
598 }
599
600 func StartSwitches(switches []*Switch) error {
601         for _, s := range switches {
602                 _, err := s.Start() // start switch and reactors
603                 if err != nil {
604                         return err
605                 }
606         }
607         return nil
608 }
609
610 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
611         privKey := crypto.GenPrivKeyEd25519()
612         // new switch, add reactors
613         // TODO: let the config be passed in?
614         s := initSwitch(i, NewSwitch(cfg, nil))
615         s.SetNodeInfo(&NodeInfo{
616                 PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
617                 Moniker:    cmn.Fmt("switch%d", i),
618                 Network:    network,
619                 Version:    version,
620                 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
621                 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
622         })
623         s.SetNodePrivKey(privKey)
624         return s
625 }
626
627 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
628         peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
629         if err != nil {
630                 conn.Close()
631                 return err
632         }
633         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
634         if err = sw.AddPeer(peer); err != nil {
635                 conn.Close()
636                 return err
637         }
638
639         return nil
640 }
641
642 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
643         fullAddr := conn.RemoteAddr().String()
644         host, _, err := net.SplitHostPort(fullAddr)
645         if err != nil {
646                 return err
647         }
648
649         if err = sw.checkBannedPeer(host); err != nil {
650                 return err
651         }
652
653         peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
654         if err != nil {
655                 return err
656         }
657         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
658         if err = sw.AddPeer(peer); err != nil {
659                 return err
660         }
661
662         return nil
663 }
664
665 func (sw *Switch) AddBannedPeer(peer *Peer) error {
666         sw.mtx.Lock()
667         defer sw.mtx.Unlock()
668         if peer == nil {
669                 return nil
670         }
671         key := peer.mconn.RemoteAddress.IP.String()
672         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
673         datajson, err := json.Marshal(sw.bannedPeer)
674         if err != nil {
675                 return err
676         }
677         sw.db.Set([]byte(bannedPeerKey), datajson)
678         return nil
679 }
680
681 func (sw *Switch) delBannedPeer(addr string) error {
682         delete(sw.bannedPeer, addr)
683         datajson, err := json.Marshal(sw.bannedPeer)
684         if err != nil {
685                 return err
686         }
687         sw.db.Set([]byte(bannedPeerKey), datajson)
688         return nil
689 }
690
691 func (sw *Switch) checkBannedPeer(peer string) error {
692         sw.mtx.Lock()
693         defer sw.mtx.Unlock()
694
695         if banEnd, ok := sw.bannedPeer[peer]; ok {
696                 if time.Now().Before(banEnd) {
697                         return ErrConnectBannedPeer
698                 }
699                 sw.delBannedPeer(peer)
700         }
701         return nil
702 }