10 crypto "github.com/tendermint/go-crypto"
11 cfg "github.com/bytom/config"
12 cmn "github.com/tendermint/tmlibs/common"
16 reconnectAttempts = 30
17 reconnectInterval = 3 * time.Second
20 type Reactor interface {
21 cmn.Service // Start, Stop
24 GetChannels() []*ChannelDescriptor
26 RemovePeer(peer *Peer, reason interface{})
27 Receive(chID byte, peer *Peer, msgBytes []byte)
30 //--------------------------------------
32 type BaseReactor struct {
33 cmn.BaseService // Provides Start, Stop, .Quit
37 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
39 BaseService: *cmn.NewBaseService(nil, name, impl),
44 func (br *BaseReactor) SetSwitch(sw *Switch) {
47 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
48 func (_ *BaseReactor) AddPeer(peer *Peer) {}
49 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
50 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
52 //-----------------------------------------------------------------------------
55 The `Switch` handles peer connections and exposes an API to receive incoming messages
56 on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
57 or more `Channels`. So while sending outgoing messages is typically performed on the peer,
58 incoming messages are received on the reactor.
64 peerConfig *PeerConfig
66 reactors map[string]Reactor
67 chDescs []*ChannelDescriptor
68 reactorsByCh map[byte]Reactor
71 nodeInfo *NodeInfo // our node info
72 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
74 filterConnByAddr func(net.Addr) error
75 filterConnByPubKey func(crypto.PubKeyEd25519) error
79 ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
82 func NewSwitch(config *cfg.P2PConfig) *Switch {
85 peerConfig: DefaultPeerConfig(),
86 reactors: make(map[string]Reactor),
87 chDescs: make([]*ChannelDescriptor, 0),
88 reactorsByCh: make(map[byte]Reactor),
90 dialing: cmn.NewCMap(),
93 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
97 // Not goroutine safe.
98 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
99 // Validate the reactor.
100 // No two reactors can share the same channel.
101 reactorChannels := reactor.GetChannels()
102 for _, chDesc := range reactorChannels {
104 if sw.reactorsByCh[chID] != nil {
105 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
107 sw.chDescs = append(sw.chDescs, chDesc)
108 sw.reactorsByCh[chID] = reactor
110 sw.reactors[name] = reactor
111 reactor.SetSwitch(sw)
115 // Not goroutine safe.
116 func (sw *Switch) Reactors() map[string]Reactor {
120 // Not goroutine safe.
121 func (sw *Switch) Reactor(name string) Reactor {
122 return sw.reactors[name]
125 // Not goroutine safe.
126 func (sw *Switch) AddListener(l Listener) {
127 sw.listeners = append(sw.listeners, l)
130 // Not goroutine safe.
131 func (sw *Switch) Listeners() []Listener {
135 // Not goroutine safe.
136 func (sw *Switch) IsListening() bool {
137 return len(sw.listeners) > 0
140 // Not goroutine safe.
141 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
142 sw.nodeInfo = nodeInfo
145 // Not goroutine safe.
146 func (sw *Switch) NodeInfo() *NodeInfo {
150 // Not goroutine safe.
151 // NOTE: Overwrites sw.nodeInfo.PubKey
152 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
153 sw.nodePrivKey = nodePrivKey
154 if sw.nodeInfo != nil {
155 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
159 // Switch.Start() starts all the reactors, peers, and listeners.
160 func (sw *Switch) OnStart() error {
161 sw.BaseService.OnStart()
163 for _, reactor := range sw.reactors {
164 _, err := reactor.Start()
170 for _, peer := range sw.peers.List() {
171 sw.startInitPeer(peer)
174 for _, listener := range sw.listeners {
175 go sw.listenerRoutine(listener)
180 func (sw *Switch) OnStop() {
181 sw.BaseService.OnStop()
183 for _, listener := range sw.listeners {
188 for _, peer := range sw.peers.List() {
190 sw.peers.Remove(peer)
193 for _, reactor := range sw.reactors {
198 // NOTE: This performs a blocking handshake before the peer is added.
199 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
200 func (sw *Switch) AddPeer(peer *Peer) error {
201 if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
205 if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
209 if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
214 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
215 return errors.New("Ignoring connection from self")
218 // Check version, chain id
219 if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
223 // Check for duplicate peer
224 if sw.peers.Has(peer.Key) {
225 return ErrSwitchDuplicatePeer
231 sw.startInitPeer(peer)
234 // Add the peer to .peers.
235 // We start it first so that a peer in the list is safe to Stop.
236 // It should not err since we already checked peers.Has()
237 if err := sw.peers.Add(peer); err != nil {
241 sw.Logger.Info("Added peer", "peer", peer)
245 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
246 if sw.filterConnByAddr != nil {
247 return sw.filterConnByAddr(addr)
252 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
253 if sw.filterConnByPubKey != nil {
254 return sw.filterConnByPubKey(pubkey)
260 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
261 sw.filterConnByAddr = f
264 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
265 sw.filterConnByPubKey = f
268 func (sw *Switch) startInitPeer(peer *Peer) {
269 peer.Start() // spawn send/recv routines
270 for _, reactor := range sw.reactors {
271 reactor.AddPeer(peer)
275 // Dial a list of seeds asynchronously in random order
276 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
278 netAddrs, err := NewNetAddressStrings(seeds)
284 // add seeds to `addrBook`
285 ourAddrS := sw.nodeInfo.ListenAddr
286 ourAddr, _ := NewNetAddressString(ourAddrS)
287 for _, netAddr := range netAddrs {
288 // do not add ourselves
289 if netAddr.Equals(ourAddr) {
292 addrBook.AddAddress(netAddr, ourAddr)
297 // permute the list, dial them in random order.
298 perm := rand.Perm(len(netAddrs))
299 for i := 0; i < len(perm); i++ {
301 time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
303 sw.dialSeed(netAddrs[j])
309 func (sw *Switch) dialSeed(addr *NetAddress) {
310 peer, err := sw.DialPeerWithAddress(addr, true)
312 sw.Logger.Error("Error dialing seed", "error", err)
314 sw.Logger.Info("Connected to seed", "peer", peer)
318 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
319 sw.dialing.Set(addr.IP.String(), addr)
320 defer sw.dialing.Delete(addr.IP.String())
322 sw.Logger.Info("Dialing peer", "address", addr)
323 peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
325 sw.Logger.Error("Failed to dial peer", "address", addr, "error", err)
328 peer.SetLogger(sw.Logger.With("peer", addr))
330 peer.makePersistent()
332 err = sw.AddPeer(peer)
334 sw.Logger.Error("Failed to add peer", "address", addr, "error", err)
338 sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
342 func (sw *Switch) IsDialing(addr *NetAddress) bool {
343 return sw.dialing.Has(addr.IP.String())
346 // Broadcast runs a go routine for each attempted send, which will block
347 // trying to send for defaultSendTimeoutSeconds. Returns a channel
348 // which receives success values for each attempted send (false if times out)
349 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
350 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
351 successChan := make(chan bool, len(sw.peers.List()))
352 sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
353 for _, peer := range sw.peers.List() {
354 go func(peer *Peer) {
355 success := peer.Send(chID, msg)
356 successChan <- success
362 // Returns the count of outbound/inbound and outbound-dialing peers.
363 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
364 peers := sw.peers.List()
365 for _, peer := range peers {
372 dialing = sw.dialing.Size()
376 func (sw *Switch) Peers() IPeerSet {
380 // Disconnect from a peer due to external error, retry if it is a persistent peer.
381 // TODO: make record depending on reason.
382 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
383 addr := NewNetAddress(peer.Addr())
384 sw.Logger.Info("Stopping peer for error", "peer", peer, "error", reason)
385 sw.stopAndRemovePeer(peer, reason)
387 if peer.IsPersistent() {
389 sw.Logger.Info("Reconnecting to peer", "peer", peer)
390 for i := 1; i < reconnectAttempts; i++ {
395 peer, err := sw.DialPeerWithAddress(addr, true)
397 if i == reconnectAttempts {
398 sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "error", err)
401 sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "error", err)
402 time.Sleep(reconnectInterval)
406 sw.Logger.Info("Reconnected to peer", "peer", peer)
413 // Disconnect from a peer gracefully.
414 // TODO: handle graceful disconnects.
415 func (sw *Switch) StopPeerGracefully(peer *Peer) {
416 sw.Logger.Info("Stopping peer gracefully")
417 sw.stopAndRemovePeer(peer, nil)
420 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
421 sw.peers.Remove(peer)
423 for _, reactor := range sw.reactors {
424 reactor.RemovePeer(peer, reason)
428 func (sw *Switch) listenerRoutine(l Listener) {
430 inConn, ok := <-l.Connections()
435 // ignore connection if we already have enough
436 maxPeers := sw.config.MaxNumPeers
437 if maxPeers <= sw.peers.Size() {
438 sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
442 // New inbound connection!
443 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
445 sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
449 // NOTE: We don't yet have the listening port of the
450 // remote (if they have a listener at all).
451 // The peerHandshake will handle that
457 //-----------------------------------------------------------------------------
459 type SwitchEventNewPeer struct {
463 type SwitchEventDonePeer struct {
468 //------------------------------------------------------------------
469 // Switches connected via arbitrary net.Conn; useful for testing
471 // Returns n switches, connected according to the connect func.
472 // If connect==Connect2Switches, the switches will be fully connected.
473 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
474 // NOTE: panics if any switch fails to start.
475 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
476 switches := make([]*Switch, n)
477 for i := 0; i < n; i++ {
478 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
481 if err := StartSwitches(switches); err != nil {
485 for i := 0; i < n; i++ {
486 for j := i; j < n; j++ {
487 connect(switches, i, j)
494 var PanicOnAddPeerErr = false
496 // Will connect switches i and j via net.Pipe()
497 // Blocks until a conection is established.
498 // NOTE: caller ensures i and j are within bounds
499 func Connect2Switches(switches []*Switch, i, j int) {
500 switchI := switches[i]
501 switchJ := switches[j]
503 doneCh := make(chan struct{})
505 err := switchI.addPeerWithConnection(c1)
506 if PanicOnAddPeerErr && err != nil {
512 err := switchJ.addPeerWithConnection(c2)
513 if PanicOnAddPeerErr && err != nil {
522 func StartSwitches(switches []*Switch) error {
523 for _, s := range switches {
524 _, err := s.Start() // start switch and reactors
532 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
533 privKey := crypto.GenPrivKeyEd25519()
534 // new switch, add reactors
535 // TODO: let the config be passed in?
536 s := initSwitch(i, NewSwitch(cfg))
537 s.SetNodeInfo(&NodeInfo{
538 PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
539 Moniker: cmn.Fmt("switch%d", i),
542 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
543 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
545 s.SetNodePrivKey(privKey)
549 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
550 peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
555 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
556 if err = sw.AddPeer(peer); err != nil {
564 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
565 peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
570 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
571 if err = sw.AddPeer(peer); err != nil {