11 log "github.com/sirupsen/logrus"
12 crypto "github.com/tendermint/go-crypto"
13 cmn "github.com/tendermint/tmlibs/common"
14 dbm "github.com/tendermint/tmlibs/db"
18 cfg "github.com/bytom/config"
19 "github.com/bytom/errors"
20 "github.com/bytom/p2p/trust"
24 reconnectAttempts = 10
25 reconnectInterval = 10 * time.Second
27 bannedPeerKey = "BannedPeer"
28 defaultBanDuration = time.Hour * 24
32 var ErrConnectBannedPeer = errors.New("Connect banned peer")
34 type Reactor interface {
35 cmn.Service // Start, Stop
38 GetChannels() []*ChannelDescriptor
39 AddPeer(peer *Peer) error
40 RemovePeer(peer *Peer, reason interface{})
41 Receive(chID byte, peer *Peer, msgBytes []byte)
44 //--------------------------------------
46 type BaseReactor struct {
47 cmn.BaseService // Provides Start, Stop, .Quit
51 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
53 BaseService: *cmn.NewBaseService(nil, name, impl),
58 func (br *BaseReactor) SetSwitch(sw *Switch) {
61 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
62 func (_ *BaseReactor) AddPeer(peer *Peer) {}
63 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
64 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
66 //-----------------------------------------------------------------------------
69 The `Switch` handles peer connections and exposes an API to receive incoming messages
70 on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
71 or more `Channels`. So while sending outgoing messages is typically performed on the peer,
72 incoming messages are received on the reactor.
78 peerConfig *PeerConfig
80 reactors map[string]Reactor
81 chDescs []*ChannelDescriptor
82 reactorsByCh map[byte]Reactor
85 nodeInfo *NodeInfo // our node info
86 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
87 bannedPeer map[string]time.Time
89 TrustMetricStore *trust.TrustMetricStore
93 filterConnByAddr func(net.Addr) error
94 filterConnByPubKey func(crypto.PubKeyEd25519) error
98 ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
99 ErrConnectSelf = errors.New("Connect self")
100 ErrPeerConnected = errors.New("Peer is connected")
103 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
106 peerConfig: DefaultPeerConfig(config),
107 reactors: make(map[string]Reactor),
108 chDescs: make([]*ChannelDescriptor, 0),
109 reactorsByCh: make(map[byte]Reactor),
111 dialing: cmn.NewCMap(),
114 ScamPeerCh: make(chan *Peer),
116 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
117 sw.TrustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig())
118 sw.TrustMetricStore.Start()
120 sw.bannedPeer = make(map[string]time.Time)
121 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
122 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
126 go sw.scamPeerHandler()
130 // Not goroutine safe.
131 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
132 // Validate the reactor.
133 // No two reactors can share the same channel.
134 reactorChannels := reactor.GetChannels()
135 for _, chDesc := range reactorChannels {
137 if sw.reactorsByCh[chID] != nil {
138 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
140 sw.chDescs = append(sw.chDescs, chDesc)
141 sw.reactorsByCh[chID] = reactor
143 sw.reactors[name] = reactor
144 reactor.SetSwitch(sw)
148 // Not goroutine safe.
149 func (sw *Switch) Reactors() map[string]Reactor {
153 // Not goroutine safe.
154 func (sw *Switch) Reactor(name string) Reactor {
155 return sw.reactors[name]
158 // Not goroutine safe.
159 func (sw *Switch) AddListener(l Listener) {
160 sw.listeners = append(sw.listeners, l)
163 // Not goroutine safe.
164 func (sw *Switch) Listeners() []Listener {
168 // Not goroutine safe.
169 func (sw *Switch) IsListening() bool {
170 return len(sw.listeners) > 0
173 // Not goroutine safe.
174 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
175 sw.nodeInfo = nodeInfo
178 // Not goroutine safe.
179 func (sw *Switch) NodeInfo() *NodeInfo {
183 // Not goroutine safe.
184 // NOTE: Overwrites sw.nodeInfo.PubKey
185 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
186 sw.nodePrivKey = nodePrivKey
187 if sw.nodeInfo != nil {
188 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
192 // Switch.Start() starts all the reactors, peers, and listeners.
193 func (sw *Switch) OnStart() error {
194 sw.BaseService.OnStart()
196 for _, reactor := range sw.reactors {
197 _, err := reactor.Start()
203 for _, peer := range sw.peers.List() {
204 sw.startInitPeer(peer)
207 for _, listener := range sw.listeners {
208 go sw.listenerRoutine(listener)
213 func (sw *Switch) OnStop() {
214 sw.BaseService.OnStop()
216 for _, listener := range sw.listeners {
221 for _, peer := range sw.peers.List() {
223 sw.peers.Remove(peer)
226 for _, reactor := range sw.reactors {
231 // NOTE: This performs a blocking handshake before the peer is added.
232 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
233 func (sw *Switch) AddPeer(peer *Peer) error {
234 if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
238 if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
242 if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
246 if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
251 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
252 return errors.New("Ignoring connection from self")
255 // Check version, chain id
256 if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
260 // Check for duplicate peer
261 if sw.peers.Has(peer.Key) {
262 return ErrSwitchDuplicatePeer
268 if err := sw.startInitPeer(peer); err != nil {
273 // Add the peer to .peers.
274 // We start it first so that a peer in the list is safe to Stop.
275 // It should not err since we already checked peers.Has()
276 if err := sw.peers.Add(peer); err != nil {
280 tm := trust.NewMetric()
283 sw.TrustMetricStore.AddPeerTrustMetric(peer.mconn.RemoteAddress.IP.String(), tm)
285 log.WithField("peer", peer).Info("Added peer")
289 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
290 if sw.filterConnByAddr != nil {
291 return sw.filterConnByAddr(addr)
296 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
297 if sw.filterConnByPubKey != nil {
298 return sw.filterConnByPubKey(pubkey)
304 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
305 sw.filterConnByAddr = f
308 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
309 sw.filterConnByPubKey = f
312 func (sw *Switch) startInitPeer(peer *Peer) error {
313 peer.Start() // spawn send/recv routines
314 for _, reactor := range sw.reactors {
315 if err := reactor.AddPeer(peer); err != nil {
322 // Dial a list of seeds asynchronously in random order
323 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
325 netAddrs, err := NewNetAddressStrings(seeds)
331 // add seeds to `addrBook`
332 ourAddrS := sw.nodeInfo.ListenAddr
333 ourAddr, _ := NewNetAddressString(ourAddrS)
334 for _, netAddr := range netAddrs {
335 // do not add ourselves
336 if netAddr.Equals(ourAddr) {
339 addrBook.AddAddress(netAddr, ourAddr)
344 // permute the list, dial them in random order.
345 perm := rand.Perm(len(netAddrs))
346 for i := 0; i < len(perm); i++ {
348 time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
350 sw.dialSeed(netAddrs[j])
356 func (sw *Switch) dialSeed(addr *NetAddress) {
357 peer, err := sw.DialPeerWithAddress(addr, true)
359 log.WithField("error", err).Error("Error dialing seed")
361 log.WithField("peer", peer).Info("Connected to seed")
365 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
366 if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
369 if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
370 return nil, ErrConnectSelf
372 for _, v := range sw.Peers().list {
373 if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
374 return nil, ErrPeerConnected
377 sw.dialing.Set(addr.IP.String(), addr)
378 defer sw.dialing.Delete(addr.IP.String())
380 log.WithField("address", addr).Info("Dialing peer")
381 peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
383 log.WithFields(log.Fields{
386 }).Info("Failed to dial peer")
389 peer.SetLogger(sw.Logger.With("peer", addr))
391 peer.makePersistent()
393 err = sw.AddPeer(peer)
395 log.WithFields(log.Fields{
398 }).Info("Failed to add peer")
402 log.WithFields(log.Fields{
404 }).Info("Dialed and added peer")
408 func (sw *Switch) IsDialing(addr *NetAddress) bool {
409 return sw.dialing.Has(addr.IP.String())
412 // Broadcast runs a go routine for each attempted send, which will block
413 // trying to send for defaultSendTimeoutSeconds. Returns a channel
414 // which receives success values for each attempted send (false if times out)
415 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
416 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
417 successChan := make(chan bool, len(sw.peers.List()))
418 log.WithFields(log.Fields{
421 }).Debug("Broadcast")
422 for _, peer := range sw.peers.List() {
423 go func(peer *Peer) {
424 success := peer.Send(chID, msg)
425 successChan <- success
431 // Returns the count of outbound/inbound and outbound-dialing peers.
432 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
433 peers := sw.peers.List()
434 for _, peer := range peers {
441 dialing = sw.dialing.Size()
445 func (sw *Switch) Peers() *PeerSet {
449 // Disconnect from a peer due to external error, retry if it is a persistent peer.
450 // TODO: make record depending on reason.
451 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
452 addr := NewNetAddress(peer.Addr())
453 log.WithFields(log.Fields{
456 }).Info("Stopping peer due to error")
457 sw.stopAndRemovePeer(peer, reason)
459 if peer.IsPersistent() {
460 log.WithField("peer", peer).Info("Reconnecting to peer")
461 for i := 1; i < reconnectAttempts; i++ {
466 peer, err := sw.DialPeerWithAddress(addr, true)
468 if i == reconnectAttempts {
469 log.WithFields(log.Fields{
472 }).Info("Error reconnecting to peer. Giving up")
476 if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
477 log.WithField("error", err).Info("Error reconnecting to peer. ")
481 log.WithFields(log.Fields{
484 }).Info("Error reconnecting to peer. Trying again")
485 time.Sleep(reconnectInterval)
489 log.WithField("peer", peer).Info("Reconnected to peer")
495 // Disconnect from a peer gracefully.
496 // TODO: handle graceful disconnects.
497 func (sw *Switch) StopPeerGracefully(peer *Peer) {
498 log.Info("Stopping peer gracefully")
499 sw.stopAndRemovePeer(peer, nil)
502 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
503 sw.peers.Remove(peer)
505 for _, reactor := range sw.reactors {
506 reactor.RemovePeer(peer, reason)
510 func (sw *Switch) listenerRoutine(l Listener) {
512 inConn, ok := <-l.Connections()
517 // ignore connection if we already have enough
518 maxPeers := sw.config.MaxNumPeers
519 if maxPeers <= sw.peers.Size() {
522 log.WithFields(log.Fields{
523 "address": inConn.RemoteAddr().String(),
524 "numPeers": sw.peers.Size(),
526 }).Info("Ignoring inbound connection: already have enough peers")
530 // New inbound connection!
531 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
533 // conn close for returing err
535 log.WithFields(log.Fields{
536 "address": inConn.RemoteAddr().String(),
538 }).Info("Ignoring inbound connection: error while adding peer")
542 // NOTE: We don't yet have the listening port of the
543 // remote (if they have a listener at all).
544 // The peerHandshake will handle that
550 //-----------------------------------------------------------------------------
552 type SwitchEventNewPeer struct {
556 type SwitchEventDonePeer struct {
561 //------------------------------------------------------------------
562 // Switches connected via arbitrary net.Conn; useful for testing
564 // Returns n switches, connected according to the connect func.
565 // If connect==Connect2Switches, the switches will be fully connected.
566 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
567 // NOTE: panics if any switch fails to start.
568 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
569 switches := make([]*Switch, n)
570 for i := 0; i < n; i++ {
571 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
574 if err := StartSwitches(switches); err != nil {
578 for i := 0; i < n; i++ {
579 for j := i; j < n; j++ {
580 connect(switches, i, j)
587 var PanicOnAddPeerErr = false
589 // Will connect switches i and j via net.Pipe()
590 // Blocks until a conection is established.
591 // NOTE: caller ensures i and j are within bounds
592 func Connect2Switches(switches []*Switch, i, j int) {
593 switchI := switches[i]
594 switchJ := switches[j]
596 doneCh := make(chan struct{})
598 err := switchI.addPeerWithConnection(c1)
599 if PanicOnAddPeerErr && err != nil {
605 err := switchJ.addPeerWithConnection(c2)
606 if PanicOnAddPeerErr && err != nil {
615 func StartSwitches(switches []*Switch) error {
616 for _, s := range switches {
617 _, err := s.Start() // start switch and reactors
625 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
626 privKey := crypto.GenPrivKeyEd25519()
627 // new switch, add reactors
628 // TODO: let the config be passed in?
629 s := initSwitch(i, NewSwitch(cfg, nil))
630 s.SetNodeInfo(&NodeInfo{
631 PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
632 Moniker: cmn.Fmt("switch%d", i),
635 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
636 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
638 s.SetNodePrivKey(privKey)
642 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
643 peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
648 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
649 if err = sw.AddPeer(peer); err != nil {
657 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
658 fullAddr := conn.RemoteAddr().String()
659 host, _, err := net.SplitHostPort(fullAddr)
664 if err = sw.checkBannedPeer(host); err != nil {
668 peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
672 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
673 if err = sw.AddPeer(peer); err != nil {
680 func (sw *Switch) AddBannedPeer(peer *Peer) error {
682 defer sw.mtx.Unlock()
684 key := peer.mconn.RemoteAddress.IP.String()
685 sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
686 datajson, err := json.Marshal(sw.bannedPeer)
690 sw.db.Set([]byte(bannedPeerKey), datajson)
694 func (sw *Switch) DelBannedPeer(addr string) error {
696 defer sw.mtx.Unlock()
698 delete(sw.bannedPeer, addr)
699 datajson, err := json.Marshal(sw.bannedPeer)
703 sw.db.Set([]byte(bannedPeerKey), datajson)
707 func (sw *Switch) scamPeerHandler() {
708 for src := range sw.ScamPeerCh {
709 var tm *trust.TrustMetric
710 key := src.Connection().RemoteAddress.IP.String()
711 if tm = sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
712 log.Errorf("Can't get peer trust metric")
715 sw.delTrustMetric(tm, src)
719 func (sw *Switch) AddScamPeer(src *Peer) {
723 func (sw *Switch) delTrustMetric(tm *trust.TrustMetric, src *Peer) {
724 key := src.Connection().RemoteAddress.IP.String()
726 if tm.TrustScore() < peerBannedTM {
727 sw.AddBannedPeer(src)
728 sw.TrustMetricStore.PeerDisconnected(key)
729 sw.StopPeerGracefully(src)
733 func (sw *Switch) checkBannedPeer(peer string) error {
734 if banEnd, ok := sw.bannedPeer[peer]; ok {
735 if time.Now().Before(banEnd) {
736 return ErrConnectBannedPeer
738 sw.DelBannedPeer(peer)