OSDN Git Service

5cd234b0181938aefc385c641e62401ec9c7b50d
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         cmn "github.com/tendermint/tmlibs/common"
12
13         cfg "github.com/vapor/config"
14         "github.com/vapor/consensus"
15         "github.com/vapor/crypto/sha3pool"
16         "github.com/vapor/errors"
17         "github.com/vapor/event"
18         "github.com/vapor/p2p/connection"
19         "github.com/vapor/p2p/discover/dht"
20         "github.com/vapor/p2p/discover/mdns"
21         "github.com/vapor/p2p/netutil"
22         security "github.com/vapor/p2p/security"
23         "github.com/vapor/p2p/signlib"
24         "github.com/vapor/version"
25 )
26
27 const (
28         logModule = "p2p"
29
30         minNumOutboundPeers = 4
31         maxNumLANPeers      = 5
32         //magicNumber used to generate unique netID
33         magicNumber = uint64(0x054c5638)
34 )
35
36 //pre-define errors for connecting fail
37 var (
38         ErrDuplicatePeer  = errors.New("Duplicate peer")
39         ErrConnectSelf    = errors.New("Connect self")
40         ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
41 )
42
43 type discv interface {
44         ReadRandomNodes(buf []*dht.Node) (n int)
45 }
46
47 type lanDiscv interface {
48         Subscribe() (*event.Subscription, error)
49         Stop()
50 }
51
52 type Security interface {
53         DoFilter(ip string, pubKey string) error
54         IsBanned(ip string, level byte, reason string) bool
55         RegisterFilter(filter security.Filter)
56         Start() error
57 }
58
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
63 type Switch struct {
64         cmn.BaseService
65
66         Config       *cfg.Config
67         peerConfig   *PeerConfig
68         listeners    []Listener
69         reactors     map[string]Reactor
70         chDescs      []*connection.ChannelDescriptor
71         reactorsByCh map[byte]Reactor
72         peers        *PeerSet
73         dialing      *cmn.CMap
74         nodeInfo     *NodeInfo       // our node info
75         nodePrivKey  signlib.PrivKey // our node privkey
76         discv        discv
77         lanDiscv     lanDiscv
78         security     Security
79 }
80
81 // NewSwitch create a new Switch and set discover.
82 func NewSwitch(config *cfg.Config) (*Switch, error) {
83         var err error
84         var l Listener
85         var listenAddr string
86         var discv *dht.Network
87         var lanDiscv *mdns.LANDiscover
88
89         //generate unique netID
90         var data []byte
91         var h [32]byte
92         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
93         magic := make([]byte, 8)
94         binary.BigEndian.PutUint64(magic, magicNumber)
95         data = append(data, magic[:]...)
96         sha3pool.Sum256(h[:], data)
97         netID := binary.BigEndian.Uint64(h[:8])
98
99         privateKey := config.PrivateKey()
100         if !config.VaultMode {
101                 // Create listener
102                 l, listenAddr = GetListener(config.P2P)
103                 discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID)
104                 if err != nil {
105                         return nil, err
106                 }
107                 if config.P2P.LANDiscover {
108                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
109                 }
110         }
111
112         return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
113 }
114
115 // newSwitch creates a new Switch with the given config.
116 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
117         sw := &Switch{
118                 Config:       config,
119                 peerConfig:   DefaultPeerConfig(config.P2P),
120                 reactors:     make(map[string]Reactor),
121                 chDescs:      make([]*connection.ChannelDescriptor, 0),
122                 reactorsByCh: make(map[byte]Reactor),
123                 peers:        NewPeerSet(),
124                 dialing:      cmn.NewCMap(),
125                 nodePrivKey:  privKey,
126                 discv:        discv,
127                 lanDiscv:     lanDiscv,
128                 nodeInfo:     NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
129                 security:     security.NewSecurity(config),
130         }
131
132         sw.AddListener(l)
133         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
134         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
135         return sw, nil
136 }
137
138 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
139 func (sw *Switch) OnStart() error {
140         for _, reactor := range sw.reactors {
141                 if _, err := reactor.Start(); err != nil {
142                         return err
143                 }
144         }
145
146         sw.security.RegisterFilter(sw.nodeInfo)
147         sw.security.RegisterFilter(sw.peers)
148         if err := sw.security.Start(); err != nil {
149                 return err
150         }
151
152         for _, listener := range sw.listeners {
153                 go sw.listenerRoutine(listener)
154         }
155         go sw.ensureOutboundPeersRoutine()
156         go sw.connectLANPeersRoutine()
157
158         return nil
159 }
160
161 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
162 func (sw *Switch) OnStop() {
163         if sw.Config.P2P.LANDiscover {
164                 sw.lanDiscv.Stop()
165         }
166
167         for _, listener := range sw.listeners {
168                 listener.Stop()
169         }
170         sw.listeners = nil
171
172         for _, peer := range sw.peers.List() {
173                 peer.Stop()
174                 sw.peers.Remove(peer)
175         }
176
177         for _, reactor := range sw.reactors {
178                 reactor.Stop()
179         }
180 }
181
182 // AddPeer performs the P2P handshake with a peer
183 // that already has a SecretConnection. If all goes well,
184 // it starts the peer and adds it to the switch.
185 // NOTE: This performs a blocking handshake before the peer is added.
186 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
187 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
188         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
189         if err != nil {
190                 return err
191         }
192
193         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
194                 return err
195         }
196
197         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
198                 return err
199         }
200
201         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
202         if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil {
203                 return err
204         }
205
206         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
207                 return ErrConnectSpvPeer
208         }
209
210         // Start peer
211         if sw.IsRunning() {
212                 if err := sw.startInitPeer(peer); err != nil {
213                         return err
214                 }
215         }
216
217         return sw.peers.Add(peer)
218 }
219
220 // AddReactor adds the given reactor to the switch.
221 // NOTE: Not goroutine safe.
222 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
223         // Validate the reactor.
224         // No two reactors can share the same channel.
225         for _, chDesc := range reactor.GetChannels() {
226                 chID := chDesc.ID
227                 if sw.reactorsByCh[chID] != nil {
228                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
229                 }
230                 sw.chDescs = append(sw.chDescs, chDesc)
231                 sw.reactorsByCh[chID] = reactor
232         }
233         sw.reactors[name] = reactor
234         reactor.SetSwitch(sw)
235         return reactor
236 }
237
238 // AddListener adds the given listener to the switch for listening to incoming peer connections.
239 // NOTE: Not goroutine safe.
240 func (sw *Switch) AddListener(l Listener) {
241         sw.listeners = append(sw.listeners, l)
242 }
243
244 //DialPeerWithAddress dial node from net address
245 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
246         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
247         sw.dialing.Set(addr.IP.String(), addr)
248         defer sw.dialing.Delete(addr.IP.String())
249         if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
250                 return err
251         }
252
253         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
254         if err != nil {
255                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
256                 return err
257         }
258
259         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
260                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
261                 pc.CloseConn()
262                 return err
263         }
264         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
265         return nil
266 }
267
268 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
269         return sw.security.IsBanned(ip, level, reason)
270 }
271
272 //IsDialing prevent duplicate dialing
273 func (sw *Switch) IsDialing(addr *NetAddress) bool {
274         return sw.dialing.Has(addr.IP.String())
275 }
276
277 // IsListening returns true if the switch has at least one listener.
278 // NOTE: Not goroutine safe.
279 func (sw *Switch) IsListening() bool {
280         return len(sw.listeners) > 0
281 }
282
283 // Listeners returns the list of listeners the switch listens on.
284 // NOTE: Not goroutine safe.
285 func (sw *Switch) Listeners() []Listener {
286         return sw.listeners
287 }
288
289 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
290 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
291         peers := sw.peers.List()
292         for _, peer := range peers {
293                 if peer.outbound && !peer.isLAN {
294                         outbound++
295                 } else {
296                         inbound++
297                 }
298                 if peer.isLAN {
299                         lan++
300                 }
301         }
302         dialing = sw.dialing.Size()
303         return
304 }
305
306 //Peers return switch peerset
307 func (sw *Switch) Peers() *PeerSet {
308         return sw.peers
309 }
310
311 // StopPeerForError disconnects from a peer due to external error.
312 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
313         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
314         sw.stopAndRemovePeer(peer, reason)
315 }
316
317 // StopPeerGracefully disconnect from a peer gracefully.
318 func (sw *Switch) StopPeerGracefully(peerID string) {
319         if peer := sw.peers.Get(peerID); peer != nil {
320                 sw.stopAndRemovePeer(peer, nil)
321         }
322 }
323
324 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
325         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
326         if err != nil {
327                 if err := conn.Close(); err != nil {
328                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
329                 }
330                 return err
331         }
332
333         if err = sw.AddPeer(peerConn, false); err != nil {
334                 if err := conn.Close(); err != nil {
335                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
336                 }
337                 return err
338         }
339
340         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
341         return nil
342 }
343
344 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
345         lanPeers, _, _, numDialing := sw.NumPeers()
346         numToDial := maxNumLANPeers - lanPeers
347         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
348         if numToDial <= 0 {
349                 return
350         }
351         addresses := make([]*NetAddress, 0)
352         for i := 0; i < len(lanPeer.IP); i++ {
353                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
354         }
355         sw.dialPeers(addresses)
356 }
357
358 func (sw *Switch) connectLANPeersRoutine() {
359         if !sw.Config.P2P.LANDiscover {
360                 return
361         }
362
363         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
364         if err != nil {
365                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
366                 return
367         }
368
369         for {
370                 select {
371                 case obj, ok := <-lanPeerEventSub.Chan():
372                         if !ok {
373                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
374                                 return
375                         }
376                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
377                         if !ok {
378                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
379                                 continue
380                         }
381                         sw.connectLANPeers(LANPeer)
382                 case <-sw.Quit:
383                         return
384                 }
385         }
386 }
387
388 func (sw *Switch) listenerRoutine(l Listener) {
389         for {
390                 inConn, ok := <-l.Connections()
391                 if !ok {
392                         break
393                 }
394
395                 // disconnect if we alrady have MaxNumPeers
396                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
397                         if err := inConn.Close(); err != nil {
398                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
399                         }
400                         log.Info("Ignoring inbound connection: already have enough peers.")
401                         continue
402                 }
403
404                 // New inbound connection!
405                 if err := sw.addPeerWithConnection(inConn); err != nil {
406                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
407                         continue
408                 }
409         }
410 }
411
412 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
413         if err := sw.DialPeerWithAddress(a); err != nil {
414                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
415         }
416         wg.Done()
417 }
418
419 func (sw *Switch) dialPeers(addresses []*NetAddress) {
420         connectedPeers := make(map[string]struct{})
421         for _, peer := range sw.Peers().List() {
422                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
423         }
424
425         var wg sync.WaitGroup
426         for _, address := range addresses {
427                 if sw.nodeInfo.ListenAddr == address.String() {
428                         continue
429                 }
430                 if dialling := sw.IsDialing(address); dialling {
431                         continue
432                 }
433                 if _, ok := connectedPeers[address.IP.String()]; ok {
434                         continue
435                 }
436
437                 wg.Add(1)
438                 go sw.dialPeerWorker(address, &wg)
439         }
440         wg.Wait()
441 }
442
443 func (sw *Switch) ensureKeepConnectPeers() {
444         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
445         addresses := make([]*NetAddress, 0)
446         for _, keepDial := range keepDials {
447                 address, err := NewNetAddressString(keepDial)
448                 if err != nil {
449                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
450                         continue
451                 }
452                 addresses = append(addresses, address)
453         }
454
455         sw.dialPeers(addresses)
456 }
457
458 func (sw *Switch) ensureOutboundPeers() {
459         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
460         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
461         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
462         if numToDial <= 0 {
463                 return
464         }
465
466         nodes := make([]*dht.Node, numToDial)
467         n := sw.discv.ReadRandomNodes(nodes)
468         addresses := make([]*NetAddress, 0)
469         for i := 0; i < n; i++ {
470                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
471                 addresses = append(addresses, address)
472         }
473         sw.dialPeers(addresses)
474 }
475
476 func (sw *Switch) ensureOutboundPeersRoutine() {
477         sw.ensureKeepConnectPeers()
478         sw.ensureOutboundPeers()
479
480         ticker := time.NewTicker(10 * time.Second)
481         defer ticker.Stop()
482
483         for {
484                 select {
485                 case <-ticker.C:
486                         sw.ensureKeepConnectPeers()
487                         sw.ensureOutboundPeers()
488                 case <-sw.Quit:
489                         return
490                 }
491         }
492 }
493
494 func (sw *Switch) startInitPeer(peer *Peer) error {
495         // spawn send/recv routines
496         if _, err := peer.Start(); err != nil {
497                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
498         }
499
500         for _, reactor := range sw.reactors {
501                 if err := reactor.AddPeer(peer); err != nil {
502                         return err
503                 }
504         }
505         return nil
506 }
507
508 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
509         sw.peers.Remove(peer)
510         for _, reactor := range sw.reactors {
511                 reactor.RemovePeer(peer, reason)
512         }
513         peer.Stop()
514
515         sentStatus, receivedStatus := peer.TrafficStatus()
516         log.WithFields(log.Fields{
517                 "module":                logModule,
518                 "address":               peer.Addr().String(),
519                 "reason":                reason,
520                 "duration":              sentStatus.Duration.String(),
521                 "total_sent":            sentStatus.Bytes,
522                 "total_received":        receivedStatus.Bytes,
523                 "average_sent_rate":     sentStatus.AvgRate,
524                 "average_received_rate": receivedStatus.AvgRate,
525                 "peer num":              sw.peers.Size(),
526         }).Info("disconnect with peer")
527 }