OSDN Git Service

Merge branch 'dev' into bvm
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "errors"
5         "fmt"
6         "math/rand"
7         "net"
8         "time"
9
10         cfg "github.com/bytom/config"
11         log "github.com/sirupsen/logrus"
12         crypto "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14 )
15
16 const (
17         reconnectAttempts = 30
18         reconnectInterval = 3 * time.Second
19 )
20
21 type Reactor interface {
22         cmn.Service // Start, Stop
23
24         SetSwitch(*Switch)
25         GetChannels() []*ChannelDescriptor
26         AddPeer(peer *Peer)
27         RemovePeer(peer *Peer, reason interface{})
28         Receive(chID byte, peer *Peer, msgBytes []byte)
29 }
30
31 //--------------------------------------
32
33 type BaseReactor struct {
34         cmn.BaseService // Provides Start, Stop, .Quit
35         Switch          *Switch
36 }
37
38 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
39         return &BaseReactor{
40                 BaseService: *cmn.NewBaseService(nil, name, impl),
41                 Switch:      nil,
42         }
43 }
44
45 func (br *BaseReactor) SetSwitch(sw *Switch) {
46         br.Switch = sw
47 }
48 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
49 func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
50 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
51 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
52
53 //-----------------------------------------------------------------------------
54
55 /*
56 The `Switch` handles peer connections and exposes an API to receive incoming messages
57 on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
58 or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
59 incoming messages are received on the reactor.
60 */
61 type Switch struct {
62         cmn.BaseService
63
64         config       *cfg.P2PConfig
65         peerConfig   *PeerConfig
66         listeners    []Listener
67         reactors     map[string]Reactor
68         chDescs      []*ChannelDescriptor
69         reactorsByCh map[byte]Reactor
70         peers        *PeerSet
71         dialing      *cmn.CMap
72         nodeInfo     *NodeInfo             // our node info
73         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
74
75         filterConnByAddr   func(net.Addr) error
76         filterConnByPubKey func(crypto.PubKeyEd25519) error
77 }
78
79 var (
80         ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
81 )
82
83 func NewSwitch(config *cfg.P2PConfig) *Switch {
84         sw := &Switch{
85                 config:       config,
86                 peerConfig:   DefaultPeerConfig(),
87                 reactors:     make(map[string]Reactor),
88                 chDescs:      make([]*ChannelDescriptor, 0),
89                 reactorsByCh: make(map[byte]Reactor),
90                 peers:        NewPeerSet(),
91                 dialing:      cmn.NewCMap(),
92                 nodeInfo:     nil,
93         }
94         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
95         return sw
96 }
97
98 // Not goroutine safe.
99 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
100         // Validate the reactor.
101         // No two reactors can share the same channel.
102         reactorChannels := reactor.GetChannels()
103         for _, chDesc := range reactorChannels {
104                 chID := chDesc.ID
105                 if sw.reactorsByCh[chID] != nil {
106                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
107                 }
108                 sw.chDescs = append(sw.chDescs, chDesc)
109                 sw.reactorsByCh[chID] = reactor
110         }
111         sw.reactors[name] = reactor
112         reactor.SetSwitch(sw)
113         return reactor
114 }
115
116 // Not goroutine safe.
117 func (sw *Switch) Reactors() map[string]Reactor {
118         return sw.reactors
119 }
120
121 // Not goroutine safe.
122 func (sw *Switch) Reactor(name string) Reactor {
123         return sw.reactors[name]
124 }
125
126 // Not goroutine safe.
127 func (sw *Switch) AddListener(l Listener) {
128         sw.listeners = append(sw.listeners, l)
129 }
130
131 // Not goroutine safe.
132 func (sw *Switch) Listeners() []Listener {
133         return sw.listeners
134 }
135
136 // Not goroutine safe.
137 func (sw *Switch) IsListening() bool {
138         return len(sw.listeners) > 0
139 }
140
141 // Not goroutine safe.
142 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
143         sw.nodeInfo = nodeInfo
144 }
145
146 // Not goroutine safe.
147 func (sw *Switch) NodeInfo() *NodeInfo {
148         return sw.nodeInfo
149 }
150
151 // Not goroutine safe.
152 // NOTE: Overwrites sw.nodeInfo.PubKey
153 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
154         sw.nodePrivKey = nodePrivKey
155         if sw.nodeInfo != nil {
156                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
157         }
158 }
159
160 // Switch.Start() starts all the reactors, peers, and listeners.
161 func (sw *Switch) OnStart() error {
162         sw.BaseService.OnStart()
163         // Start reactors
164         for _, reactor := range sw.reactors {
165                 _, err := reactor.Start()
166                 if err != nil {
167                         return err
168                 }
169         }
170         // Start peers
171         for _, peer := range sw.peers.List() {
172                 sw.startInitPeer(peer)
173         }
174         // Start listeners
175         for _, listener := range sw.listeners {
176                 go sw.listenerRoutine(listener)
177         }
178         return nil
179 }
180
181 func (sw *Switch) OnStop() {
182         sw.BaseService.OnStop()
183         // Stop listeners
184         for _, listener := range sw.listeners {
185                 listener.Stop()
186         }
187         sw.listeners = nil
188         // Stop peers
189         for _, peer := range sw.peers.List() {
190                 peer.Stop()
191                 sw.peers.Remove(peer)
192         }
193         // Stop reactors
194         for _, reactor := range sw.reactors {
195                 reactor.Stop()
196         }
197 }
198
199 // NOTE: This performs a blocking handshake before the peer is added.
200 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
201 func (sw *Switch) AddPeer(peer *Peer) error {
202         if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
203                 return err
204         }
205
206         if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
207                 return err
208         }
209
210         if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
211                 return err
212         }
213
214         // Avoid self
215         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
216                 return errors.New("Ignoring connection from self")
217         }
218
219         // Check version, chain id
220         if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
221                 return err
222         }
223
224         // Check for duplicate peer
225         if sw.peers.Has(peer.Key) {
226                 return ErrSwitchDuplicatePeer
227
228         }
229
230         // Start peer
231         if sw.IsRunning() {
232                 sw.startInitPeer(peer)
233         }
234
235         // Add the peer to .peers.
236         // We start it first so that a peer in the list is safe to Stop.
237         // It should not err since we already checked peers.Has()
238         if err := sw.peers.Add(peer); err != nil {
239                 return err
240         }
241
242         log.WithField("peer", peer).Info("Added peer")
243         return nil
244 }
245
246 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
247         if sw.filterConnByAddr != nil {
248                 return sw.filterConnByAddr(addr)
249         }
250         return nil
251 }
252
253 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
254         if sw.filterConnByPubKey != nil {
255                 return sw.filterConnByPubKey(pubkey)
256         }
257         return nil
258
259 }
260
261 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
262         sw.filterConnByAddr = f
263 }
264
265 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
266         sw.filterConnByPubKey = f
267 }
268
269 func (sw *Switch) startInitPeer(peer *Peer) {
270         peer.Start() // spawn send/recv routines
271         for _, reactor := range sw.reactors {
272                 reactor.AddPeer(peer)
273         }
274 }
275
276 // Dial a list of seeds asynchronously in random order
277 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
278
279         netAddrs, err := NewNetAddressStrings(seeds)
280         if err != nil {
281                 return err
282         }
283
284         if addrBook != nil {
285                 // add seeds to `addrBook`
286                 ourAddrS := sw.nodeInfo.ListenAddr
287                 ourAddr, _ := NewNetAddressString(ourAddrS)
288                 for _, netAddr := range netAddrs {
289                         // do not add ourselves
290                         if netAddr.Equals(ourAddr) {
291                                 continue
292                         }
293                         addrBook.AddAddress(netAddr, ourAddr)
294                 }
295                 addrBook.Save()
296         }
297
298         // permute the list, dial them in random order.
299         perm := rand.Perm(len(netAddrs))
300         for i := 0; i < len(perm); i++ {
301                 go func(i int) {
302                         time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
303                         j := perm[i]
304                         sw.dialSeed(netAddrs[j])
305                 }(i)
306         }
307         return nil
308 }
309
310 func (sw *Switch) dialSeed(addr *NetAddress) {
311         peer, err := sw.DialPeerWithAddress(addr, true)
312         if err != nil {
313                 log.WithField("error", err).Error("Error dialing seed")
314         } else {
315                 log.WithField("peer", peer).Info("Connected to seed")
316         }
317 }
318
319 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
320         sw.dialing.Set(addr.IP.String(), addr)
321         defer sw.dialing.Delete(addr.IP.String())
322
323         log.WithField("address", addr).Info("Dialing peer")
324         peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
325         if err != nil {
326                 log.WithFields(log.Fields{
327                         "address": addr,
328                         "error":   err,
329                 }).Info("Failed to dial peer")
330                 return nil, err
331         }
332         peer.SetLogger(sw.Logger.With("peer", addr))
333         if persistent {
334                 peer.makePersistent()
335         }
336         err = sw.AddPeer(peer)
337         if err != nil {
338                 log.WithFields(log.Fields{
339                         "address": addr,
340                         "error":   err,
341                 }).Info("Failed to add peer")
342                 peer.CloseConn()
343                 return nil, err
344         }
345         log.WithFields(log.Fields{
346                 "address": addr,
347                 "error":   err,
348         }).Info("Dialed and added peer")
349         return peer, nil
350 }
351
352 func (sw *Switch) IsDialing(addr *NetAddress) bool {
353         return sw.dialing.Has(addr.IP.String())
354 }
355
356 // Broadcast runs a go routine for each attempted send, which will block
357 // trying to send for defaultSendTimeoutSeconds. Returns a channel
358 // which receives success values for each attempted send (false if times out)
359 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
360 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
361         successChan := make(chan bool, len(sw.peers.List()))
362         log.WithFields(log.Fields{
363                 "chID": chID,
364                 "msg":  msg,
365         }).Debug("Broadcast")
366         for _, peer := range sw.peers.List() {
367                 go func(peer *Peer) {
368                         success := peer.Send(chID, msg)
369                         successChan <- success
370                 }(peer)
371         }
372         return successChan
373 }
374
375 // Returns the count of outbound/inbound and outbound-dialing peers.
376 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
377         peers := sw.peers.List()
378         for _, peer := range peers {
379                 if peer.outbound {
380                         outbound++
381                 } else {
382                         inbound++
383                 }
384         }
385         dialing = sw.dialing.Size()
386         return
387 }
388
389 func (sw *Switch) Peers() IPeerSet {
390         return sw.peers
391 }
392
393 // Disconnect from a peer due to external error, retry if it is a persistent peer.
394 // TODO: make record depending on reason.
395 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
396         addr := NewNetAddress(peer.Addr())
397         log.WithFields(log.Fields{
398                 "peer":  peer,
399                 "error": reason,
400         }).Info("Stopping peer due to error")
401         sw.stopAndRemovePeer(peer, reason)
402
403         if peer.IsPersistent() {
404                 go func() {
405                         log.WithField("peer", peer).Info("Reconnecting to peer")
406                         for i := 1; i < reconnectAttempts; i++ {
407                                 if !sw.IsRunning() {
408                                         return
409                                 }
410
411                                 peer, err := sw.DialPeerWithAddress(addr, true)
412                                 if err != nil {
413                                         if i == reconnectAttempts {
414                                                 log.WithFields(log.Fields{
415                                                         "retries": i,
416                                                         "error":   err,
417                                                 }).Info("Error reconnecting to peer. Giving up")
418                                                 return
419                                         }
420                                         log.WithFields(log.Fields{
421                                                 "retries": i,
422                                                 "error":   err,
423                                         }).Info("Error reconnecting to peer. Trying again")
424                                         time.Sleep(reconnectInterval)
425                                         continue
426                                 }
427
428                                 log.WithField("peer", peer).Info("Reconnected to peer")
429                                 return
430                         }
431                 }()
432         }
433 }
434
435 // Disconnect from a peer gracefully.
436 // TODO: handle graceful disconnects.
437 func (sw *Switch) StopPeerGracefully(peer *Peer) {
438         log.Info("Stopping peer gracefully")
439         sw.stopAndRemovePeer(peer, nil)
440 }
441
442 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
443         sw.peers.Remove(peer)
444         peer.Stop()
445         for _, reactor := range sw.reactors {
446                 reactor.RemovePeer(peer, reason)
447         }
448 }
449
450 func (sw *Switch) listenerRoutine(l Listener) {
451         for {
452                 inConn, ok := <-l.Connections()
453                 if !ok {
454                         break
455                 }
456
457                 // ignore connection if we already have enough
458                 maxPeers := sw.config.MaxNumPeers
459                 if maxPeers <= sw.peers.Size() {
460                         log.WithFields(log.Fields{
461                                 "address":  inConn.RemoteAddr().String(),
462                                 "numPeers": sw.peers.Size(),
463                                 "max":      maxPeers,
464                         }).Info("Ignoring inbound connection: already have enough peers")
465                         continue
466                 }
467
468                 // New inbound connection!
469                 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
470                 if err != nil {
471                         log.WithFields(log.Fields{
472                                 "address": inConn.RemoteAddr().String(),
473                                 "error":   err,
474                         }).Info("Ignoring inbound connection: error while adding peer")
475                         continue
476                 }
477
478                 // NOTE: We don't yet have the listening port of the
479                 // remote (if they have a listener at all).
480                 // The peerHandshake will handle that
481         }
482
483         // cleanup
484 }
485
486 //-----------------------------------------------------------------------------
487
488 type SwitchEventNewPeer struct {
489         Peer *Peer
490 }
491
492 type SwitchEventDonePeer struct {
493         Peer  *Peer
494         Error interface{}
495 }
496
497 //------------------------------------------------------------------
498 // Switches connected via arbitrary net.Conn; useful for testing
499
500 // Returns n switches, connected according to the connect func.
501 // If connect==Connect2Switches, the switches will be fully connected.
502 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
503 // NOTE: panics if any switch fails to start.
504 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
505         switches := make([]*Switch, n)
506         for i := 0; i < n; i++ {
507                 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
508         }
509
510         if err := StartSwitches(switches); err != nil {
511                 panic(err)
512         }
513
514         for i := 0; i < n; i++ {
515                 for j := i; j < n; j++ {
516                         connect(switches, i, j)
517                 }
518         }
519
520         return switches
521 }
522
523 var PanicOnAddPeerErr = false
524
525 // Will connect switches i and j via net.Pipe()
526 // Blocks until a conection is established.
527 // NOTE: caller ensures i and j are within bounds
528 func Connect2Switches(switches []*Switch, i, j int) {
529         switchI := switches[i]
530         switchJ := switches[j]
531         c1, c2 := net.Pipe()
532         doneCh := make(chan struct{})
533         go func() {
534                 err := switchI.addPeerWithConnection(c1)
535                 if PanicOnAddPeerErr && err != nil {
536                         panic(err)
537                 }
538                 doneCh <- struct{}{}
539         }()
540         go func() {
541                 err := switchJ.addPeerWithConnection(c2)
542                 if PanicOnAddPeerErr && err != nil {
543                         panic(err)
544                 }
545                 doneCh <- struct{}{}
546         }()
547         <-doneCh
548         <-doneCh
549 }
550
551 func StartSwitches(switches []*Switch) error {
552         for _, s := range switches {
553                 _, err := s.Start() // start switch and reactors
554                 if err != nil {
555                         return err
556                 }
557         }
558         return nil
559 }
560
561 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
562         privKey := crypto.GenPrivKeyEd25519()
563         // new switch, add reactors
564         // TODO: let the config be passed in?
565         s := initSwitch(i, NewSwitch(cfg))
566         s.SetNodeInfo(&NodeInfo{
567                 PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
568                 Moniker:    cmn.Fmt("switch%d", i),
569                 Network:    network,
570                 Version:    version,
571                 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
572                 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
573         })
574         s.SetNodePrivKey(privKey)
575         return s
576 }
577
578 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
579         peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
580         if err != nil {
581                 conn.Close()
582                 return err
583         }
584         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
585         if err = sw.AddPeer(peer); err != nil {
586                 conn.Close()
587                 return err
588         }
589
590         return nil
591 }
592
593 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
594         peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
595         if err != nil {
596                 conn.Close()
597                 return err
598         }
599         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
600         if err = sw.AddPeer(peer); err != nil {
601                 conn.Close()
602                 return err
603         }
604
605         return nil
606 }