OSDN Git Service

a22f7ca7b4efb8cd1eda978f26ba35718d0cdb66
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/go-crypto"
11         cmn "github.com/tendermint/tmlibs/common"
12
13         cfg "github.com/bytom/bytom/config"
14         "github.com/bytom/bytom/consensus"
15         "github.com/bytom/bytom/crypto/ed25519"
16         "github.com/bytom/bytom/errors"
17         "github.com/bytom/bytom/event"
18         "github.com/bytom/bytom/p2p/connection"
19         "github.com/bytom/bytom/p2p/discover/dht"
20         "github.com/bytom/bytom/p2p/discover/mdns"
21         "github.com/bytom/bytom/p2p/netutil"
22         "github.com/bytom/bytom/p2p/security"
23         "github.com/bytom/bytom/version"
24 )
25
26 const (
27         logModule = "p2p"
28
29         minNumOutboundPeers = 4
30         maxNumLANPeers      = 5
31 )
32
33 //pre-define errors for connecting fail
34 var (
35         ErrDuplicatePeer  = errors.New("Duplicate peer")
36         ErrConnectSelf    = errors.New("Connect self")
37         ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
38 )
39
40 type discv interface {
41         ReadRandomNodes(buf []*dht.Node) (n int)
42 }
43
44 type lanDiscv interface {
45         Subscribe() (*event.Subscription, error)
46         Stop()
47 }
48
49 type Security interface {
50         DoFilter(ip string, pubKey string) error
51         IsBanned(ip string, level byte, reason string) bool
52         RegisterFilter(filter security.Filter)
53         Start() error
54 }
55
56 // Switch handles peer connections and exposes an API to receive incoming messages
57 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
58 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
59 // incoming messages are received on the reactor.
60 type Switch struct {
61         cmn.BaseService
62
63         Config       *cfg.Config
64         peerConfig   *PeerConfig
65         listeners    []Listener
66         reactors     map[string]Reactor
67         chDescs      []*connection.ChannelDescriptor
68         reactorsByCh map[byte]Reactor
69         peers        *PeerSet
70         dialing      *cmn.CMap
71         nodeInfo     *NodeInfo             // our node info
72         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
73         discv        discv
74         lanDiscv     lanDiscv
75         security     Security
76 }
77
78 // NewSwitch create a new Switch and set discover.
79 func NewSwitch(config *cfg.Config) (*Switch, error) {
80         var err error
81         var l Listener
82         var listenAddr string
83         var discv *dht.Network
84         var lanDiscv *mdns.LANDiscover
85
86         bytes := config.PrivateKey().Bytes()
87         var newKey [64]byte
88         copy(newKey[:], bytes)
89         privKey := crypto.PrivKeyEd25519(newKey)
90         if !config.VaultMode {
91                 // Create listener
92                 l, listenAddr = GetListener(config.P2P)
93                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
94                 if err != nil {
95                         return nil, err
96                 }
97                 if config.P2P.LANDiscover {
98                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
99                 }
100         }
101
102         return newSwitch(config, discv, lanDiscv, l, privKey, listenAddr)
103 }
104
105 // newSwitch creates a new Switch with the given config.
106 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
107         sw := &Switch{
108                 Config:       config,
109                 peerConfig:   DefaultPeerConfig(config.P2P),
110                 reactors:     make(map[string]Reactor),
111                 chDescs:      make([]*connection.ChannelDescriptor, 0),
112                 reactorsByCh: make(map[byte]Reactor),
113                 peers:        NewPeerSet(),
114                 dialing:      cmn.NewCMap(),
115                 nodePrivKey:  priv,
116                 discv:        discv,
117                 lanDiscv:     lanDiscv,
118                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
119                 security:     security.NewSecurity(config),
120         }
121
122         sw.AddListener(l)
123         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
124         return sw, nil
125 }
126
127 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
128 func (sw *Switch) OnStart() error {
129         for _, reactor := range sw.reactors {
130                 if err := reactor.Start(); err != nil {
131                         return err
132                 }
133         }
134
135         sw.security.RegisterFilter(sw.nodeInfo)
136         sw.security.RegisterFilter(sw.peers)
137         if err := sw.security.Start(); err != nil {
138                 return err
139         }
140
141         for _, listener := range sw.listeners {
142                 go sw.listenerRoutine(listener)
143         }
144         go sw.ensureOutboundPeersRoutine()
145         go sw.connectLANPeersRoutine()
146
147         return nil
148 }
149
150 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
151 func (sw *Switch) OnStop() {
152         if sw.Config.P2P.LANDiscover {
153                 sw.lanDiscv.Stop()
154         }
155
156         for _, listener := range sw.listeners {
157                 listener.Stop()
158         }
159         sw.listeners = nil
160
161         for _, peer := range sw.peers.List() {
162                 peer.Stop()
163                 sw.peers.Remove(peer)
164         }
165
166         for _, reactor := range sw.reactors {
167                 reactor.Stop()
168         }
169 }
170
171 // AddPeer performs the P2P handshake with a peer
172 // that already has a SecretConnection. If all goes well,
173 // it starts the peer and adds it to the switch.
174 // NOTE: This performs a blocking handshake before the peer is added.
175 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
176 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
177         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
178         if err != nil {
179                 return err
180         }
181
182         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
183                 return err
184         }
185         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
186                 return err
187         }
188
189         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
190         if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey().String()); err != nil {
191                 return err
192         }
193
194         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
195                 return ErrConnectSpvPeer
196         }
197
198         // Start peer
199         if sw.IsRunning() {
200                 if err := sw.startInitPeer(peer); err != nil {
201                         return err
202                 }
203         }
204
205         return sw.peers.Add(peer)
206 }
207
208 // AddReactor adds the given reactor to the switch.
209 // NOTE: Not goroutine safe.
210 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
211         // Validate the reactor.
212         // No two reactors can share the same channel.
213         for _, chDesc := range reactor.GetChannels() {
214                 chID := chDesc.ID
215                 if sw.reactorsByCh[chID] != nil {
216                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
217                 }
218                 sw.chDescs = append(sw.chDescs, chDesc)
219                 sw.reactorsByCh[chID] = reactor
220         }
221         sw.reactors[name] = reactor
222         reactor.SetSwitch(sw)
223         return reactor
224 }
225
226 // AddListener adds the given listener to the switch for listening to incoming peer connections.
227 // NOTE: Not goroutine safe.
228 func (sw *Switch) AddListener(l Listener) {
229         sw.listeners = append(sw.listeners, l)
230 }
231
232 //DialPeerWithAddress dial node from net address
233 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
234         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
235         sw.dialing.Set(addr.IP.String(), addr)
236         defer sw.dialing.Delete(addr.IP.String())
237         if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
238                 return err
239         }
240
241         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
242         if err != nil {
243                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
244                 return err
245         }
246
247         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
248                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on switch AddPeer")
249                 pc.CloseConn()
250                 return err
251         }
252         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
253         return nil
254 }
255
256 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
257         return sw.security.IsBanned(ip, level, reason)
258 }
259
260 //IsDialing prevent duplicate dialing
261 func (sw *Switch) IsDialing(addr *NetAddress) bool {
262         return sw.dialing.Has(addr.IP.String())
263 }
264
265 // IsListening returns true if the switch has at least one listener.
266 // NOTE: Not goroutine safe.
267 func (sw *Switch) IsListening() bool {
268         return len(sw.listeners) > 0
269 }
270
271 // Listeners returns the list of listeners the switch listens on.
272 // NOTE: Not goroutine safe.
273 func (sw *Switch) Listeners() []Listener {
274         return sw.listeners
275 }
276
277 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
278 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
279         peers := sw.peers.List()
280         for _, peer := range peers {
281                 if peer.outbound && !peer.isLAN {
282                         outbound++
283                 } else {
284                         inbound++
285                 }
286                 if peer.isLAN {
287                         lan++
288                 }
289         }
290         dialing = sw.dialing.Size()
291         return
292 }
293
294 // NodeInfo returns the switch's NodeInfo.
295 // NOTE: Not goroutine safe.
296 func (sw *Switch) NodeInfo() *NodeInfo {
297         return sw.nodeInfo
298 }
299
300 //Peers return switch peerset
301 func (sw *Switch) Peers() *PeerSet {
302         return sw.peers
303 }
304
305 // StopPeerForError disconnects from a peer due to external error.
306 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
307         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
308         sw.stopAndRemovePeer(peer, reason)
309 }
310
311 // StopPeerGracefully disconnect from a peer gracefully.
312 func (sw *Switch) StopPeerGracefully(peerID string) {
313         if peer := sw.peers.Get(peerID); peer != nil {
314                 sw.stopAndRemovePeer(peer, nil)
315         }
316 }
317
318 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
319         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
320         if err != nil {
321                 if err := conn.Close(); err != nil {
322                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
323                 }
324                 return err
325         }
326
327         if err = sw.AddPeer(peerConn, false); err != nil {
328                 if err := conn.Close(); err != nil {
329                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
330                 }
331                 return err
332         }
333
334         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
335         return nil
336 }
337
338 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
339         lanPeers, _, _, numDialing := sw.NumPeers()
340         numToDial := maxNumLANPeers - lanPeers
341         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
342         if numToDial <= 0 {
343                 return
344         }
345         addresses := make([]*NetAddress, 0)
346         for i := 0; i < len(lanPeer.IP); i++ {
347                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
348         }
349         sw.dialPeers(addresses)
350 }
351
352 func (sw *Switch) connectLANPeersRoutine() {
353         if !sw.Config.P2P.LANDiscover {
354                 return
355         }
356
357         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
358         if err != nil {
359                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
360                 return
361         }
362
363         for {
364                 select {
365                 case obj, ok := <-lanPeerEventSub.Chan():
366                         if !ok {
367                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
368                                 return
369                         }
370                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
371                         if !ok {
372                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
373                                 continue
374                         }
375                         sw.connectLANPeers(LANPeer)
376                 case <-sw.Quit():
377                         return
378                 }
379         }
380 }
381
382 func (sw *Switch) listenerRoutine(l Listener) {
383         for {
384                 inConn, ok := <-l.Connections()
385                 if !ok {
386                         break
387                 }
388
389                 // disconnect if we alrady have MaxNumPeers
390                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
391                         if err := inConn.Close(); err != nil {
392                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
393                         }
394                         log.Info("Ignoring inbound connection: already have enough peers.")
395                         continue
396                 }
397
398                 // New inbound connection!
399                 if err := sw.addPeerWithConnection(inConn); err != nil {
400                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
401                         continue
402                 }
403         }
404 }
405
406 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
407         if err := sw.DialPeerWithAddress(a); err != nil {
408                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
409         }
410         wg.Done()
411 }
412
413 func (sw *Switch) dialPeers(addresses []*NetAddress) {
414         connectedPeers := make(map[string]struct{})
415         for _, peer := range sw.Peers().List() {
416                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
417         }
418
419         var wg sync.WaitGroup
420         for _, address := range addresses {
421                 if sw.NodeInfo().ListenAddr == address.String() {
422                         continue
423                 }
424                 if dialling := sw.IsDialing(address); dialling {
425                         continue
426                 }
427                 if _, ok := connectedPeers[address.IP.String()]; ok {
428                         continue
429                 }
430
431                 wg.Add(1)
432                 go sw.dialPeerWorker(address, &wg)
433         }
434         wg.Wait()
435 }
436
437 func (sw *Switch) ensureKeepConnectPeers() {
438         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
439         addresses := make([]*NetAddress, 0)
440         for _, keepDial := range keepDials {
441                 address, err := NewNetAddressString(keepDial)
442                 if err != nil {
443                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
444                         continue
445                 }
446                 addresses = append(addresses, address)
447         }
448
449         sw.dialPeers(addresses)
450 }
451
452 func (sw *Switch) ensureOutboundPeers() {
453         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
454         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
455         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
456         if numToDial <= 0 {
457                 return
458         }
459
460         nodes := make([]*dht.Node, numToDial)
461         n := sw.discv.ReadRandomNodes(nodes)
462         addresses := make([]*NetAddress, 0)
463         for i := 0; i < n; i++ {
464                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
465                 addresses = append(addresses, address)
466         }
467         sw.dialPeers(addresses)
468 }
469
470 func (sw *Switch) ensureOutboundPeersRoutine() {
471         sw.ensureKeepConnectPeers()
472         sw.ensureOutboundPeers()
473
474         ticker := time.NewTicker(10 * time.Second)
475         defer ticker.Stop()
476
477         for {
478                 select {
479                 case <-ticker.C:
480                         sw.ensureKeepConnectPeers()
481                         sw.ensureOutboundPeers()
482                 case <-sw.Quit():
483                         return
484                 }
485         }
486 }
487
488 func (sw *Switch) startInitPeer(peer *Peer) error {
489         // spawn send/recv routines
490         if err := peer.Start(); err != nil {
491                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
492         }
493
494         for _, reactor := range sw.reactors {
495                 if err := reactor.AddPeer(peer); err != nil {
496                         return err
497                 }
498         }
499         return nil
500 }
501
502 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
503         sw.peers.Remove(peer)
504         for _, reactor := range sw.reactors {
505                 reactor.RemovePeer(peer, reason)
506         }
507         peer.Stop()
508
509         sentStatus, receivedStatus := peer.TrafficStatus()
510         log.WithFields(log.Fields{
511                 "module":                logModule,
512                 "address":               peer.Addr().String(),
513                 "reason":                reason,
514                 "duration":              sentStatus.Duration.String(),
515                 "total_sent":            sentStatus.Bytes,
516                 "total_received":        receivedStatus.Bytes,
517                 "average_sent_rate":     sentStatus.AvgRate,
518                 "average_received_rate": receivedStatus.AvgRate,
519                 "peer num":              sw.peers.Size(),
520         }).Info("disconnect with peer")
521 }