OSDN Git Service

Merge pull request #4 from Bytom/develop
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "errors"
5         "fmt"
6         "math/rand"
7         "net"
8         "time"
9
10         crypto "github.com/tendermint/go-crypto"
11         cfg "github.com/bytom/config"
12         cmn "github.com/tendermint/tmlibs/common"
13 )
14
15 const (
16         reconnectAttempts = 30
17         reconnectInterval = 3 * time.Second
18 )
19
20 type Reactor interface {
21         cmn.Service // Start, Stop
22
23         SetSwitch(*Switch)
24         GetChannels() []*ChannelDescriptor
25         AddPeer(peer *Peer)
26         RemovePeer(peer *Peer, reason interface{})
27         Receive(chID byte, peer *Peer, msgBytes []byte)
28 }
29
30 //--------------------------------------
31
32 type BaseReactor struct {
33         cmn.BaseService // Provides Start, Stop, .Quit
34         Switch          *Switch
35 }
36
37 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
38         return &BaseReactor{
39                 BaseService: *cmn.NewBaseService(nil, name, impl),
40                 Switch:      nil,
41         }
42 }
43
44 func (br *BaseReactor) SetSwitch(sw *Switch) {
45         br.Switch = sw
46 }
47 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
48 func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
49 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
50 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
51
52 //-----------------------------------------------------------------------------
53
54 /*
55 The `Switch` handles peer connections and exposes an API to receive incoming messages
56 on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
57 or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
58 incoming messages are received on the reactor.
59 */
60 type Switch struct {
61         cmn.BaseService
62
63         config       *cfg.P2PConfig
64         peerConfig   *PeerConfig
65         listeners    []Listener
66         reactors     map[string]Reactor
67         chDescs      []*ChannelDescriptor
68         reactorsByCh map[byte]Reactor
69         peers        *PeerSet
70         dialing      *cmn.CMap
71         nodeInfo     *NodeInfo             // our node info
72         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
73
74         filterConnByAddr   func(net.Addr) error
75         filterConnByPubKey func(crypto.PubKeyEd25519) error
76 }
77
78 var (
79         ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
80 )
81
82 func NewSwitch(config *cfg.P2PConfig) *Switch {
83         sw := &Switch{
84                 config:       config,
85                 peerConfig:   DefaultPeerConfig(),
86                 reactors:     make(map[string]Reactor),
87                 chDescs:      make([]*ChannelDescriptor, 0),
88                 reactorsByCh: make(map[byte]Reactor),
89                 peers:        NewPeerSet(),
90                 dialing:      cmn.NewCMap(),
91                 nodeInfo:     nil,
92         }
93         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
94         return sw
95 }
96
97 // Not goroutine safe.
98 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
99         // Validate the reactor.
100         // No two reactors can share the same channel.
101         reactorChannels := reactor.GetChannels()
102         for _, chDesc := range reactorChannels {
103                 chID := chDesc.ID
104                 if sw.reactorsByCh[chID] != nil {
105                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
106                 }
107                 sw.chDescs = append(sw.chDescs, chDesc)
108                 sw.reactorsByCh[chID] = reactor
109         }
110         sw.reactors[name] = reactor
111         reactor.SetSwitch(sw)
112         return reactor
113 }
114
115 // Not goroutine safe.
116 func (sw *Switch) Reactors() map[string]Reactor {
117         return sw.reactors
118 }
119
120 // Not goroutine safe.
121 func (sw *Switch) Reactor(name string) Reactor {
122         return sw.reactors[name]
123 }
124
125 // Not goroutine safe.
126 func (sw *Switch) AddListener(l Listener) {
127         sw.listeners = append(sw.listeners, l)
128 }
129
130 // Not goroutine safe.
131 func (sw *Switch) Listeners() []Listener {
132         return sw.listeners
133 }
134
135 // Not goroutine safe.
136 func (sw *Switch) IsListening() bool {
137         return len(sw.listeners) > 0
138 }
139
140 // Not goroutine safe.
141 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
142         sw.nodeInfo = nodeInfo
143 }
144
145 // Not goroutine safe.
146 func (sw *Switch) NodeInfo() *NodeInfo {
147         return sw.nodeInfo
148 }
149
150 // Not goroutine safe.
151 // NOTE: Overwrites sw.nodeInfo.PubKey
152 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
153         sw.nodePrivKey = nodePrivKey
154         if sw.nodeInfo != nil {
155                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
156         }
157 }
158
159 // Switch.Start() starts all the reactors, peers, and listeners.
160 func (sw *Switch) OnStart() error {
161         sw.BaseService.OnStart()
162         // Start reactors
163         for _, reactor := range sw.reactors {
164                 _, err := reactor.Start()
165                 if err != nil {
166                         return err
167                 }
168         }
169         // Start peers
170         for _, peer := range sw.peers.List() {
171                 sw.startInitPeer(peer)
172         }
173         // Start listeners
174         for _, listener := range sw.listeners {
175                 go sw.listenerRoutine(listener)
176         }
177         return nil
178 }
179
180 func (sw *Switch) OnStop() {
181         sw.BaseService.OnStop()
182         // Stop listeners
183         for _, listener := range sw.listeners {
184                 listener.Stop()
185         }
186         sw.listeners = nil
187         // Stop peers
188         for _, peer := range sw.peers.List() {
189                 peer.Stop()
190                 sw.peers.Remove(peer)
191         }
192         // Stop reactors
193         for _, reactor := range sw.reactors {
194                 reactor.Stop()
195         }
196 }
197
198 // NOTE: This performs a blocking handshake before the peer is added.
199 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
200 func (sw *Switch) AddPeer(peer *Peer) error {
201         if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
202                 return err
203         }
204
205         if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
206                 return err
207         }
208
209         if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
210                 return err
211         }
212
213         // Avoid self
214         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
215                 return errors.New("Ignoring connection from self")
216         }
217
218         // Check version, chain id
219         if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
220                 return err
221         }
222
223         // Check for duplicate peer
224         if sw.peers.Has(peer.Key) {
225                 return ErrSwitchDuplicatePeer
226
227         }
228
229         // Start peer
230         if sw.IsRunning() {
231                 sw.startInitPeer(peer)
232         }
233
234         // Add the peer to .peers.
235         // We start it first so that a peer in the list is safe to Stop.
236         // It should not err since we already checked peers.Has()
237         if err := sw.peers.Add(peer); err != nil {
238                 return err
239         }
240
241         sw.Logger.Info("Added peer", "peer", peer)
242         return nil
243 }
244
245 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
246         if sw.filterConnByAddr != nil {
247                 return sw.filterConnByAddr(addr)
248         }
249         return nil
250 }
251
252 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
253         if sw.filterConnByPubKey != nil {
254                 return sw.filterConnByPubKey(pubkey)
255         }
256         return nil
257
258 }
259
260 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
261         sw.filterConnByAddr = f
262 }
263
264 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
265         sw.filterConnByPubKey = f
266 }
267
268 func (sw *Switch) startInitPeer(peer *Peer) {
269         peer.Start() // spawn send/recv routines
270         for _, reactor := range sw.reactors {
271                 reactor.AddPeer(peer)
272         }
273 }
274
275 // Dial a list of seeds asynchronously in random order
276 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
277
278         netAddrs, err := NewNetAddressStrings(seeds)
279         if err != nil {
280                 return err
281         }
282
283         if addrBook != nil {
284                 // add seeds to `addrBook`
285                 ourAddrS := sw.nodeInfo.ListenAddr
286                 ourAddr, _ := NewNetAddressString(ourAddrS)
287                 for _, netAddr := range netAddrs {
288                         // do not add ourselves
289                         if netAddr.Equals(ourAddr) {
290                                 continue
291                         }
292                         addrBook.AddAddress(netAddr, ourAddr)
293                 }
294                 addrBook.Save()
295         }
296
297         // permute the list, dial them in random order.
298         perm := rand.Perm(len(netAddrs))
299         for i := 0; i < len(perm); i++ {
300                 go func(i int) {
301                         time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
302                         j := perm[i]
303                         sw.dialSeed(netAddrs[j])
304                 }(i)
305         }
306         return nil
307 }
308
309 func (sw *Switch) dialSeed(addr *NetAddress) {
310         peer, err := sw.DialPeerWithAddress(addr, true)
311         if err != nil {
312                 sw.Logger.Error("Error dialing seed", "error", err)
313         } else {
314                 sw.Logger.Info("Connected to seed", "peer", peer)
315         }
316 }
317
318 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
319         sw.dialing.Set(addr.IP.String(), addr)
320         defer sw.dialing.Delete(addr.IP.String())
321
322         sw.Logger.Info("Dialing peer", "address", addr)
323         peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
324         if err != nil {
325                 sw.Logger.Error("Failed to dial peer", "address", addr, "error", err)
326                 return nil, err
327         }
328         peer.SetLogger(sw.Logger.With("peer", addr))
329         if persistent {
330                 peer.makePersistent()
331         }
332         err = sw.AddPeer(peer)
333         if err != nil {
334                 sw.Logger.Error("Failed to add peer", "address", addr, "error", err)
335                 peer.CloseConn()
336                 return nil, err
337         }
338         sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
339         return peer, nil
340 }
341
342 func (sw *Switch) IsDialing(addr *NetAddress) bool {
343         return sw.dialing.Has(addr.IP.String())
344 }
345
346 // Broadcast runs a go routine for each attempted send, which will block
347 // trying to send for defaultSendTimeoutSeconds. Returns a channel
348 // which receives success values for each attempted send (false if times out)
349 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
350 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
351         successChan := make(chan bool, len(sw.peers.List()))
352         sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
353         for _, peer := range sw.peers.List() {
354                 go func(peer *Peer) {
355                         success := peer.Send(chID, msg)
356                         successChan <- success
357                 }(peer)
358         }
359         return successChan
360 }
361
362 // Returns the count of outbound/inbound and outbound-dialing peers.
363 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
364         peers := sw.peers.List()
365         for _, peer := range peers {
366                 if peer.outbound {
367                         outbound++
368                 } else {
369                         inbound++
370                 }
371         }
372         dialing = sw.dialing.Size()
373         return
374 }
375
376 func (sw *Switch) Peers() IPeerSet {
377         return sw.peers
378 }
379
380 // Disconnect from a peer due to external error, retry if it is a persistent peer.
381 // TODO: make record depending on reason.
382 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
383         addr := NewNetAddress(peer.Addr())
384         sw.Logger.Info("Stopping peer for error", "peer", peer, "error", reason)
385         sw.stopAndRemovePeer(peer, reason)
386
387         if peer.IsPersistent() {
388                 go func() {
389                         sw.Logger.Info("Reconnecting to peer", "peer", peer)
390                         for i := 1; i < reconnectAttempts; i++ {
391                                 if !sw.IsRunning() {
392                                         return
393                                 }
394
395                                 peer, err := sw.DialPeerWithAddress(addr, true)
396                                 if err != nil {
397                                         if i == reconnectAttempts {
398                                                 sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "error", err)
399                                                 return
400                                         }
401                                         sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "error", err)
402                                         time.Sleep(reconnectInterval)
403                                         continue
404                                 }
405
406                                 sw.Logger.Info("Reconnected to peer", "peer", peer)
407                                 return
408                         }
409                 }()
410         }
411 }
412
413 // Disconnect from a peer gracefully.
414 // TODO: handle graceful disconnects.
415 func (sw *Switch) StopPeerGracefully(peer *Peer) {
416         sw.Logger.Info("Stopping peer gracefully")
417         sw.stopAndRemovePeer(peer, nil)
418 }
419
420 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
421         sw.peers.Remove(peer)
422         peer.Stop()
423         for _, reactor := range sw.reactors {
424                 reactor.RemovePeer(peer, reason)
425         }
426 }
427
428 func (sw *Switch) listenerRoutine(l Listener) {
429         for {
430                 inConn, ok := <-l.Connections()
431                 if !ok {
432                         break
433                 }
434
435                 // ignore connection if we already have enough
436                 maxPeers := sw.config.MaxNumPeers
437                 if maxPeers <= sw.peers.Size() {
438                         sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
439                         continue
440                 }
441
442                 // New inbound connection!
443                 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
444                 if err != nil {
445                         sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
446                         continue
447                 }
448
449                 // NOTE: We don't yet have the listening port of the
450                 // remote (if they have a listener at all).
451                 // The peerHandshake will handle that
452         }
453
454         // cleanup
455 }
456
457 //-----------------------------------------------------------------------------
458
459 type SwitchEventNewPeer struct {
460         Peer *Peer
461 }
462
463 type SwitchEventDonePeer struct {
464         Peer  *Peer
465         Error interface{}
466 }
467
468 //------------------------------------------------------------------
469 // Switches connected via arbitrary net.Conn; useful for testing
470
471 // Returns n switches, connected according to the connect func.
472 // If connect==Connect2Switches, the switches will be fully connected.
473 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
474 // NOTE: panics if any switch fails to start.
475 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
476         switches := make([]*Switch, n)
477         for i := 0; i < n; i++ {
478                 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
479         }
480
481         if err := StartSwitches(switches); err != nil {
482                 panic(err)
483         }
484
485         for i := 0; i < n; i++ {
486                 for j := i; j < n; j++ {
487                         connect(switches, i, j)
488                 }
489         }
490
491         return switches
492 }
493
494 var PanicOnAddPeerErr = false
495
496 // Will connect switches i and j via net.Pipe()
497 // Blocks until a conection is established.
498 // NOTE: caller ensures i and j are within bounds
499 func Connect2Switches(switches []*Switch, i, j int) {
500         switchI := switches[i]
501         switchJ := switches[j]
502         c1, c2 := net.Pipe()
503         doneCh := make(chan struct{})
504         go func() {
505                 err := switchI.addPeerWithConnection(c1)
506                 if PanicOnAddPeerErr && err != nil {
507                         panic(err)
508                 }
509                 doneCh <- struct{}{}
510         }()
511         go func() {
512                 err := switchJ.addPeerWithConnection(c2)
513                 if PanicOnAddPeerErr && err != nil {
514                         panic(err)
515                 }
516                 doneCh <- struct{}{}
517         }()
518         <-doneCh
519         <-doneCh
520 }
521
522 func StartSwitches(switches []*Switch) error {
523         for _, s := range switches {
524                 _, err := s.Start() // start switch and reactors
525                 if err != nil {
526                         return err
527                 }
528         }
529         return nil
530 }
531
532 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
533         privKey := crypto.GenPrivKeyEd25519()
534         // new switch, add reactors
535         // TODO: let the config be passed in?
536         s := initSwitch(i, NewSwitch(cfg))
537         s.SetNodeInfo(&NodeInfo{
538                 PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
539                 Moniker:    cmn.Fmt("switch%d", i),
540                 Network:    network,
541                 Version:    version,
542                 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
543                 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
544         })
545         s.SetNodePrivKey(privKey)
546         return s
547 }
548
549 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
550         peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
551         if err != nil {
552                 conn.Close()
553                 return err
554         }
555         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
556         if err = sw.AddPeer(peer); err != nil {
557                 conn.Close()
558                 return err
559         }
560
561         return nil
562 }
563
564 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
565         peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
566         if err != nil {
567                 conn.Close()
568                 return err
569         }
570         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
571         if err = sw.AddPeer(peer); err != nil {
572                 conn.Close()
573                 return err
574         }
575
576         return nil
577 }