10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
13 dbm "github.com/tendermint/tmlibs/db"
15 cfg "github.com/vapor/config"
16 "github.com/vapor/consensus"
17 "github.com/vapor/errors"
18 "github.com/vapor/p2p/connection"
19 "github.com/vapor/p2p/discover"
20 "github.com/vapor/p2p/trust"
21 "github.com/vapor/version"
25 bannedPeerKey = "BannedPeer"
26 defaultBanDuration = time.Hour * 1
27 minNumOutboundPeers = 3
30 //pre-define errors for connecting fail
32 ErrDuplicatePeer = errors.New("Duplicate peer")
33 ErrConnectSelf = errors.New("Connect self")
34 ErrConnectBannedPeer = errors.New("Connect banned peer")
35 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
38 // Switch handles peer connections and exposes an API to receive incoming messages
39 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
40 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
41 // incoming messages are received on the reactor.
46 peerConfig *PeerConfig
48 reactors map[string]Reactor
49 chDescs []*connection.ChannelDescriptor
50 reactorsByCh map[byte]Reactor
53 nodeInfo *NodeInfo // our node info
54 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
55 discv *discover.Network
56 bannedPeer map[string]time.Time
61 // NewSwitch creates a new Switch with the given config.
62 func NewSwitch(config *cfg.Config) *Switch {
65 peerConfig: DefaultPeerConfig(config.P2P),
66 reactors: make(map[string]Reactor),
67 chDescs: make([]*connection.ChannelDescriptor, 0),
68 reactorsByCh: make(map[byte]Reactor),
70 dialing: cmn.NewCMap(),
72 db: dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
74 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
75 sw.bannedPeer = make(map[string]time.Time)
76 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
77 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
85 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
86 func (sw *Switch) OnStart() error {
87 for _, reactor := range sw.reactors {
88 if _, err := reactor.Start(); err != nil {
92 for _, listener := range sw.listeners {
93 go sw.listenerRoutine(listener)
95 go sw.ensureOutboundPeersRoutine()
99 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
100 func (sw *Switch) OnStop() {
101 for _, listener := range sw.listeners {
106 for _, peer := range sw.peers.List() {
108 sw.peers.Remove(peer)
111 for _, reactor := range sw.reactors {
116 //AddBannedPeer add peer to blacklist
117 func (sw *Switch) AddBannedPeer(ip string) error {
119 defer sw.mtx.Unlock()
121 sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
122 datajson, err := json.Marshal(sw.bannedPeer)
127 sw.db.Set([]byte(bannedPeerKey), datajson)
131 // AddPeer performs the P2P handshake with a peer
132 // that already has a SecretConnection. If all goes well,
133 // it starts the peer and adds it to the switch.
134 // NOTE: This performs a blocking handshake before the peer is added.
135 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
136 func (sw *Switch) AddPeer(pc *peerConn) error {
137 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout))
142 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
145 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
149 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
150 if err := sw.filterConnByPeer(peer); err != nil {
154 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
155 return ErrConnectSpvPeer
160 if err := sw.startInitPeer(peer); err != nil {
164 return sw.peers.Add(peer)
167 // AddReactor adds the given reactor to the switch.
168 // NOTE: Not goroutine safe.
169 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
170 // Validate the reactor.
171 // No two reactors can share the same channel.
172 for _, chDesc := range reactor.GetChannels() {
174 if sw.reactorsByCh[chID] != nil {
175 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
177 sw.chDescs = append(sw.chDescs, chDesc)
178 sw.reactorsByCh[chID] = reactor
180 sw.reactors[name] = reactor
181 reactor.SetSwitch(sw)
185 // AddListener adds the given listener to the switch for listening to incoming peer connections.
186 // NOTE: Not goroutine safe.
187 func (sw *Switch) AddListener(l Listener) {
188 sw.listeners = append(sw.listeners, l)
191 //DialPeerWithAddress dial node from net address
192 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
193 log.Debug("Dialing peer address:", addr)
194 sw.dialing.Set(addr.IP.String(), addr)
195 defer sw.dialing.Delete(addr.IP.String())
196 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
200 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
202 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
206 if err = sw.AddPeer(pc); err != nil {
207 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
211 log.Debug("DialPeer added peer:", addr)
215 //IsDialing prevent duplicate dialing
216 func (sw *Switch) IsDialing(addr *NetAddress) bool {
217 return sw.dialing.Has(addr.IP.String())
220 // IsListening returns true if the switch has at least one listener.
221 // NOTE: Not goroutine safe.
222 func (sw *Switch) IsListening() bool {
223 return len(sw.listeners) > 0
226 // Listeners returns the list of listeners the switch listens on.
227 // NOTE: Not goroutine safe.
228 func (sw *Switch) Listeners() []Listener {
232 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
233 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
234 peers := sw.peers.List()
235 for _, peer := range peers {
242 dialing = sw.dialing.Size()
246 // NodeInfo returns the switch's NodeInfo.
247 // NOTE: Not goroutine safe.
248 func (sw *Switch) NodeInfo() *NodeInfo {
252 //Peers return switch peerset
253 func (sw *Switch) Peers() *PeerSet {
257 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
258 // NOTE: Not goroutine safe.
259 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
260 sw.nodeInfo = nodeInfo
263 // SetNodePrivKey sets the switch's private key for authenticated encryption.
264 // NOTE: Not goroutine safe.
265 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
266 sw.nodePrivKey = nodePrivKey
267 if sw.nodeInfo != nil {
268 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
272 // StopPeerForError disconnects from a peer due to external error.
273 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
274 log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
275 sw.stopAndRemovePeer(peer, reason)
278 // StopPeerGracefully disconnect from a peer gracefully.
279 func (sw *Switch) StopPeerGracefully(peerID string) {
280 if peer := sw.peers.Get(peerID); peer != nil {
281 sw.stopAndRemovePeer(peer, nil)
285 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
286 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
292 if err = sw.AddPeer(peerConn); err != nil {
299 func (sw *Switch) checkBannedPeer(peer string) error {
301 defer sw.mtx.Unlock()
303 if banEnd, ok := sw.bannedPeer[peer]; ok {
304 if time.Now().Before(banEnd) {
305 return ErrConnectBannedPeer
307 sw.delBannedPeer(peer)
312 func (sw *Switch) delBannedPeer(addr string) error {
314 defer sw.mtx.Unlock()
316 delete(sw.bannedPeer, addr)
317 datajson, err := json.Marshal(sw.bannedPeer)
322 sw.db.Set([]byte(bannedPeerKey), datajson)
326 func (sw *Switch) filterConnByIP(ip string) error {
327 if ip == sw.nodeInfo.ListenHost() {
328 return ErrConnectSelf
330 return sw.checkBannedPeer(ip)
333 func (sw *Switch) filterConnByPeer(peer *Peer) error {
334 if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
338 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
339 return ErrConnectSelf
342 if sw.peers.Has(peer.Key) {
343 return ErrDuplicatePeer
348 func (sw *Switch) listenerRoutine(l Listener) {
350 inConn, ok := <-l.Connections()
355 // disconnect if we alrady have MaxNumPeers
356 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
358 log.Info("Ignoring inbound connection: already have enough peers.")
362 // New inbound connection!
363 if err := sw.addPeerWithConnection(inConn); err != nil {
364 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
370 // SetDiscv connect the discv model to the switch
371 func (sw *Switch) SetDiscv(discv *discover.Network) {
375 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
376 if err := sw.DialPeerWithAddress(a); err != nil {
377 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
382 func (sw *Switch) ensureOutboundPeers() {
383 numOutPeers, _, numDialing := sw.NumPeers()
384 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
385 log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
390 connectedPeers := make(map[string]struct{})
391 for _, peer := range sw.Peers().List() {
392 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
395 var wg sync.WaitGroup
396 nodes := make([]*discover.Node, numToDial)
397 n := sw.discv.ReadRandomNodes(nodes)
398 for i := 0; i < n; i++ {
399 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
400 if sw.NodeInfo().ListenAddr == try.String() {
403 if dialling := sw.IsDialing(try); dialling {
406 if _, ok := connectedPeers[try.IP.String()]; ok {
411 go sw.dialPeerWorker(try, &wg)
416 func (sw *Switch) ensureOutboundPeersRoutine() {
417 sw.ensureOutboundPeers()
419 ticker := time.NewTicker(10 * time.Second)
425 sw.ensureOutboundPeers()
432 func (sw *Switch) startInitPeer(peer *Peer) error {
433 peer.Start() // spawn send/recv routines
434 for _, reactor := range sw.reactors {
435 if err := reactor.AddPeer(peer); err != nil {
442 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
443 sw.peers.Remove(peer)
444 for _, reactor := range sw.reactors {
445 reactor.RemovePeer(peer, reason)
449 sentStatus, receivedStatus := peer.TrafficStatus()
450 log.WithFields(log.Fields{
451 "address": peer.Addr().String(),
453 "duration": sentStatus.Duration.String(),
454 "total_sent": sentStatus.Bytes,
455 "total_received": receivedStatus.Bytes,
456 "average_sent_rate": sentStatus.AvgRate,
457 "average_received_rate": receivedStatus.AvgRate,
458 }).Info("disconnect with peer")