OSDN Git Service

075498f35fda30011167dd405f99c532cb7ceb1e
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         cmn "github.com/tendermint/tmlibs/common"
12
13         cfg "github.com/vapor/config"
14         "github.com/vapor/consensus"
15         "github.com/vapor/crypto/sha3pool"
16         "github.com/vapor/errors"
17         "github.com/vapor/event"
18         "github.com/vapor/p2p/connection"
19         "github.com/vapor/p2p/discover/dht"
20         "github.com/vapor/p2p/discover/mdns"
21         "github.com/vapor/p2p/netutil"
22         security "github.com/vapor/p2p/security"
23         "github.com/vapor/p2p/signlib"
24         "github.com/vapor/version"
25 )
26
27 const (
28         logModule = "p2p"
29
30         minNumOutboundPeers = 3
31         maxNumLANPeers      = 5
32         //magicNumber used to generate unique netID
33         magicNumber = uint64(0x054c5638)
34 )
35
36 //pre-define errors for connecting fail
37 var (
38         ErrDuplicatePeer  = errors.New("Duplicate peer")
39         ErrConnectSelf    = errors.New("Connect self")
40         ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
41 )
42
43 type discv interface {
44         ReadRandomNodes(buf []*dht.Node) (n int)
45 }
46
47 type lanDiscv interface {
48         Subscribe() (*event.Subscription, error)
49         Stop()
50 }
51
52 type Security interface {
53         DoFilter(ip string, pubKey string) error
54         IsBanned(ip string, level byte, reason string) bool
55         RegisterFilter(filter security.Filter)
56         Start() error
57 }
58
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
63 type Switch struct {
64         cmn.BaseService
65
66         Config       *cfg.Config
67         peerConfig   *PeerConfig
68         listeners    []Listener
69         reactors     map[string]Reactor
70         chDescs      []*connection.ChannelDescriptor
71         reactorsByCh map[byte]Reactor
72         peers        *PeerSet
73         dialing      *cmn.CMap
74         nodeInfo     *NodeInfo       // our node info
75         nodePrivKey  signlib.PrivKey // our node privkey
76         discv        discv
77         lanDiscv     lanDiscv
78         security     Security
79 }
80
81 // NewSwitchMaybeDiscover create a new Switch and set discover.
82 func NewSwitchMaybeDiscover(config *cfg.Config) (*Switch, error) {
83         var err error
84         var l Listener
85         var listenAddr string
86         var discv *dht.Network
87         var lanDiscv *mdns.LANDiscover
88
89         //generate unique netID
90         var data []byte
91         var h [32]byte
92         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
93         magic := make([]byte, 8)
94         binary.BigEndian.PutUint64(magic, magicNumber)
95         data = append(data, magic[:]...)
96         sha3pool.Sum256(h[:], data)
97         netID := binary.BigEndian.Uint64(h[:8])
98
99         privateKey := config.PrivateKey()
100         if !config.VaultMode {
101                 // Create listener
102                 l, listenAddr = GetListener(config.P2P)
103                 discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID)
104                 if err != nil {
105                         return nil, err
106                 }
107                 if config.P2P.LANDiscover {
108                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
109                 }
110         }
111
112         return NewSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
113 }
114
115 // newSwitch creates a new Switch with the given config.
116 func NewSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
117         sw := &Switch{
118                 Config:       config,
119                 peerConfig:   DefaultPeerConfig(config.P2P),
120                 reactors:     make(map[string]Reactor),
121                 chDescs:      make([]*connection.ChannelDescriptor, 0),
122                 reactorsByCh: make(map[byte]Reactor),
123                 peers:        NewPeerSet(),
124                 dialing:      cmn.NewCMap(),
125                 nodePrivKey:  privKey,
126                 discv:        discv,
127                 lanDiscv:     lanDiscv,
128                 nodeInfo:     NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
129                 security:     security.NewSecurity(config),
130         }
131
132         sw.AddListener(l)
133         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
134         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
135         return sw, nil
136 }
137
138 func (sw *Switch) GetDiscv() discv {
139         return sw.discv
140 }
141
142 func (sw *Switch) GetNodeInfo() *NodeInfo {
143         return sw.nodeInfo
144 }
145
146 func (sw *Switch) GetPeers() *PeerSet {
147         return sw.peers
148 }
149
150 func (sw *Switch) GetReactors() map[string]Reactor {
151         return sw.reactors
152 }
153
154 func (sw *Switch) GetSecurity() Security {
155         return sw.security
156 }
157
158 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
159 func (sw *Switch) OnStart() error {
160         for _, reactor := range sw.reactors {
161                 if _, err := reactor.Start(); err != nil {
162                         return err
163                 }
164         }
165
166         sw.security.RegisterFilter(sw.nodeInfo)
167         sw.security.RegisterFilter(sw.peers)
168         if err := sw.security.Start(); err != nil {
169                 return err
170         }
171
172         for _, listener := range sw.listeners {
173                 go sw.listenerRoutine(listener)
174         }
175         go sw.ensureOutboundPeersRoutine()
176         go sw.connectLANPeersRoutine()
177
178         return nil
179 }
180
181 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
182 func (sw *Switch) OnStop() {
183         if sw.Config.P2P.LANDiscover {
184                 sw.lanDiscv.Stop()
185         }
186
187         for _, listener := range sw.listeners {
188                 listener.Stop()
189         }
190         sw.listeners = nil
191
192         for _, peer := range sw.peers.List() {
193                 peer.Stop()
194                 sw.peers.Remove(peer)
195         }
196
197         for _, reactor := range sw.reactors {
198                 reactor.Stop()
199         }
200 }
201
202 // AddPeer performs the P2P handshake with a peer
203 // that already has a SecretConnection. If all goes well,
204 // it starts the peer and adds it to the switch.
205 // NOTE: This performs a blocking handshake before the peer is added.
206 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
207 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
208         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
209         if err != nil {
210                 return err
211         }
212
213         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
214                 return err
215         }
216
217         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
218                 return err
219         }
220
221         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
222         if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil {
223                 return err
224         }
225
226         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
227                 return ErrConnectSpvPeer
228         }
229
230         // Start peer
231         if sw.IsRunning() {
232                 if err := sw.startInitPeer(peer); err != nil {
233                         return err
234                 }
235         }
236
237         return sw.peers.Add(peer)
238 }
239
240 // AddReactor adds the given reactor to the switch.
241 // NOTE: Not goroutine safe.
242 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
243         // Validate the reactor.
244         // No two reactors can share the same channel.
245         for _, chDesc := range reactor.GetChannels() {
246                 chID := chDesc.ID
247                 if sw.reactorsByCh[chID] != nil {
248                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
249                 }
250                 sw.chDescs = append(sw.chDescs, chDesc)
251                 sw.reactorsByCh[chID] = reactor
252         }
253         sw.reactors[name] = reactor
254         reactor.SetSwitch(sw)
255         return reactor
256 }
257
258 // AddListener adds the given listener to the switch for listening to incoming peer connections.
259 // NOTE: Not goroutine safe.
260 func (sw *Switch) AddListener(l Listener) {
261         sw.listeners = append(sw.listeners, l)
262 }
263
264 //DialPeerWithAddress dial node from net address
265 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
266         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
267         sw.dialing.Set(addr.IP.String(), addr)
268         defer sw.dialing.Delete(addr.IP.String())
269         if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
270                 return err
271         }
272
273         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
274         if err != nil {
275                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
276                 return err
277         }
278
279         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
280                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
281                 pc.CloseConn()
282                 return err
283         }
284         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
285         return nil
286 }
287
288 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
289         return sw.security.IsBanned(ip, level, reason)
290 }
291
292 //IsDialing prevent duplicate dialing
293 func (sw *Switch) IsDialing(addr *NetAddress) bool {
294         return sw.dialing.Has(addr.IP.String())
295 }
296
297 // IsListening returns true if the switch has at least one listener.
298 // NOTE: Not goroutine safe.
299 func (sw *Switch) IsListening() bool {
300         return len(sw.listeners) > 0
301 }
302
303 // Listeners returns the list of listeners the switch listens on.
304 // NOTE: Not goroutine safe.
305 func (sw *Switch) Listeners() []Listener {
306         return sw.listeners
307 }
308
309 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
310 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
311         peers := sw.peers.List()
312         for _, peer := range peers {
313                 if peer.outbound && !peer.isLAN {
314                         outbound++
315                 } else {
316                         inbound++
317                 }
318                 if peer.isLAN {
319                         lan++
320                 }
321         }
322         dialing = sw.dialing.Size()
323         return
324 }
325
326 //Peers return switch peerset
327 func (sw *Switch) Peers() *PeerSet {
328         return sw.peers
329 }
330
331 // StopPeerForError disconnects from a peer due to external error.
332 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
333         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
334         sw.stopAndRemovePeer(peer, reason)
335 }
336
337 // StopPeerGracefully disconnect from a peer gracefully.
338 func (sw *Switch) StopPeerGracefully(peerID string) {
339         if peer := sw.peers.Get(peerID); peer != nil {
340                 sw.stopAndRemovePeer(peer, nil)
341         }
342 }
343
344 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
345         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
346         if err != nil {
347                 if err := conn.Close(); err != nil {
348                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
349                 }
350                 return err
351         }
352
353         if err = sw.AddPeer(peerConn, false); err != nil {
354                 if err := conn.Close(); err != nil {
355                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
356                 }
357                 return err
358         }
359
360         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
361         return nil
362 }
363
364 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
365         lanPeers, _, _, numDialing := sw.NumPeers()
366         numToDial := maxNumLANPeers - lanPeers
367         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
368         if numToDial <= 0 {
369                 return
370         }
371         addresses := make([]*NetAddress, 0)
372         for i := 0; i < len(lanPeer.IP); i++ {
373                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
374         }
375         sw.DialPeers(addresses)
376 }
377
378 func (sw *Switch) connectLANPeersRoutine() {
379         if !sw.Config.P2P.LANDiscover {
380                 return
381         }
382
383         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
384         if err != nil {
385                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
386                 return
387         }
388
389         for {
390                 select {
391                 case obj, ok := <-lanPeerEventSub.Chan():
392                         if !ok {
393                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
394                                 return
395                         }
396                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
397                         if !ok {
398                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
399                                 continue
400                         }
401                         sw.connectLANPeers(LANPeer)
402                 case <-sw.Quit:
403                         return
404                 }
405         }
406 }
407
408 func (sw *Switch) listenerRoutine(l Listener) {
409         for {
410                 inConn, ok := <-l.Connections()
411                 if !ok {
412                         break
413                 }
414
415                 // disconnect if we already have MaxNumPeers
416                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
417                         if err := inConn.Close(); err != nil {
418                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
419                         }
420                         log.Info("Ignoring inbound connection: already have enough peers.")
421                         continue
422                 }
423
424                 // New inbound connection!
425                 if err := sw.addPeerWithConnection(inConn); err != nil {
426                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
427                         continue
428                 }
429         }
430 }
431
432 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
433         if err := sw.DialPeerWithAddress(a); err != nil {
434                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
435         }
436         wg.Done()
437 }
438
439 func (sw *Switch) DialPeers(addresses []*NetAddress) {
440         connectedPeers := make(map[string]struct{})
441         for _, peer := range sw.Peers().List() {
442                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
443         }
444
445         var wg sync.WaitGroup
446         for _, address := range addresses {
447                 if sw.nodeInfo.ListenAddr == address.String() {
448                         continue
449                 }
450                 if dialling := sw.IsDialing(address); dialling {
451                         continue
452                 }
453                 if _, ok := connectedPeers[address.IP.String()]; ok {
454                         continue
455                 }
456
457                 wg.Add(1)
458                 go sw.dialPeerWorker(address, &wg)
459         }
460         wg.Wait()
461 }
462
463 func (sw *Switch) ensureKeepConnectPeers() {
464         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
465         addresses := make([]*NetAddress, 0)
466         for _, keepDial := range keepDials {
467                 address, err := NewNetAddressString(keepDial)
468                 if err != nil {
469                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
470                         continue
471                 }
472                 addresses = append(addresses, address)
473         }
474
475         sw.DialPeers(addresses)
476 }
477
478 func (sw *Switch) ensureOutboundPeers() {
479         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
480         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
481         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
482         if numToDial <= 0 {
483                 return
484         }
485
486         nodes := make([]*dht.Node, numToDial)
487         n := sw.discv.ReadRandomNodes(nodes)
488         addresses := make([]*NetAddress, 0)
489         for i := 0; i < n; i++ {
490                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
491                 addresses = append(addresses, address)
492         }
493         sw.DialPeers(addresses)
494 }
495
496 func (sw *Switch) ensureOutboundPeersRoutine() {
497         sw.ensureKeepConnectPeers()
498         sw.ensureOutboundPeers()
499
500         ticker := time.NewTicker(10 * time.Second)
501         defer ticker.Stop()
502
503         for {
504                 select {
505                 case <-ticker.C:
506                         sw.ensureKeepConnectPeers()
507                         sw.ensureOutboundPeers()
508                 case <-sw.Quit:
509                         return
510                 }
511         }
512 }
513
514 func (sw *Switch) startInitPeer(peer *Peer) error {
515         // spawn send/recv routines
516         if _, err := peer.Start(); err != nil {
517                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
518         }
519
520         for _, reactor := range sw.reactors {
521                 if err := reactor.AddPeer(peer); err != nil {
522                         return err
523                 }
524         }
525         return nil
526 }
527
528 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
529         sw.peers.Remove(peer)
530         for _, reactor := range sw.reactors {
531                 reactor.RemovePeer(peer, reason)
532         }
533         peer.Stop()
534
535         sentStatus, receivedStatus := peer.TrafficStatus()
536         log.WithFields(log.Fields{
537                 "module":                logModule,
538                 "address":               peer.Addr().String(),
539                 "reason":                reason,
540                 "duration":              sentStatus.Duration.String(),
541                 "total_sent":            sentStatus.Bytes,
542                 "total_received":        receivedStatus.Bytes,
543                 "average_sent_rate":     sentStatus.AvgRate,
544                 "average_received_rate": receivedStatus.AvgRate,
545                 "peer num":              sw.peers.Size(),
546         }).Info("disconnect with peer")
547 }