OSDN Git Service

Format netsync module code directory (#88)
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "encoding/hex"
6         "encoding/json"
7         "fmt"
8         "net"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/vapor/config"
17         "github.com/vapor/consensus"
18         "github.com/vapor/crypto/ed25519"
19         "github.com/vapor/crypto/sha3pool"
20         dbm "github.com/vapor/database/leveldb"
21         "github.com/vapor/errors"
22         "github.com/vapor/event"
23         "github.com/vapor/p2p/connection"
24         "github.com/vapor/p2p/discover/dht"
25         "github.com/vapor/p2p/discover/mdns"
26         "github.com/vapor/p2p/netutil"
27         "github.com/vapor/p2p/trust"
28         "github.com/vapor/version"
29 )
30
31 const (
32         bannedPeerKey      = "BannedPeer"
33         defaultBanDuration = time.Hour * 1
34         logModule          = "p2p"
35
36         minNumOutboundPeers = 4
37         maxNumLANPeers      = 5
38         //magicNumber used to generate unique netID
39         magicNumber = uint64(0x054c5638)
40 )
41
42 //pre-define errors for connecting fail
43 var (
44         ErrDuplicatePeer     = errors.New("Duplicate peer")
45         ErrConnectSelf       = errors.New("Connect self")
46         ErrConnectBannedPeer = errors.New("Connect banned peer")
47         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
48 )
49
50 type discv interface {
51         ReadRandomNodes(buf []*dht.Node) (n int)
52 }
53
54 type lanDiscv interface {
55         Subscribe() (*event.Subscription, error)
56         Stop()
57 }
58
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
63 type Switch struct {
64         cmn.BaseService
65
66         Config       *cfg.Config
67         peerConfig   *PeerConfig
68         listeners    []Listener
69         reactors     map[string]Reactor
70         chDescs      []*connection.ChannelDescriptor
71         reactorsByCh map[byte]Reactor
72         peers        *PeerSet
73         dialing      *cmn.CMap
74         nodeInfo     *NodeInfo             // our node info
75         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
76         discv        discv
77         lanDiscv     lanDiscv
78         bannedPeer   map[string]time.Time
79         db           dbm.DB
80         mtx          sync.Mutex
81 }
82
83 // NewSwitch create a new Switch and set discover.
84 func NewSwitch(config *cfg.Config) (*Switch, error) {
85         var err error
86         var l Listener
87         var listenAddr string
88         var discv *dht.Network
89         var lanDiscv *mdns.LANDiscover
90
91         //generate unique netID
92         var data []byte
93         var h [32]byte
94         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
95         magic := make([]byte, 8)
96         binary.BigEndian.PutUint64(magic, magicNumber)
97         data = append(data, magic[:]...)
98         sha3pool.Sum256(h[:], data)
99         netID := binary.BigEndian.Uint64(h[:8])
100
101         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
102         config.P2P.PrivateKey, err = config.NodeKey()
103         if err != nil {
104                 return nil, err
105         }
106
107         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
108         if err != nil {
109                 return nil, err
110         }
111
112         var newKey [64]byte
113         copy(newKey[:], bytes)
114         privKey := crypto.PrivKeyEd25519(newKey)
115         if !config.VaultMode {
116                 // Create listener
117                 l, listenAddr = GetListener(config.P2P)
118                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port, netID)
119                 if err != nil {
120                         return nil, err
121                 }
122                 if config.P2P.LANDiscover {
123                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
124                 }
125         }
126
127         return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr, netID)
128 }
129
130 // newSwitch creates a new Switch with the given config.
131 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string, netID uint64) (*Switch, error) {
132         sw := &Switch{
133                 Config:       config,
134                 peerConfig:   DefaultPeerConfig(config.P2P),
135                 reactors:     make(map[string]Reactor),
136                 chDescs:      make([]*connection.ChannelDescriptor, 0),
137                 reactorsByCh: make(map[byte]Reactor),
138                 peers:        NewPeerSet(),
139                 dialing:      cmn.NewCMap(),
140                 nodePrivKey:  priv,
141                 discv:        discv,
142                 lanDiscv:     lanDiscv,
143                 db:           blacklistDB,
144                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr, netID),
145                 bannedPeer:   make(map[string]time.Time),
146         }
147         if err := sw.loadBannedPeers(); err != nil {
148                 return nil, err
149         }
150
151         sw.AddListener(l)
152         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
153         trust.Init()
154         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
155         return sw, nil
156 }
157
158 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
159 func (sw *Switch) OnStart() error {
160         for _, reactor := range sw.reactors {
161                 if _, err := reactor.Start(); err != nil {
162                         return err
163                 }
164         }
165         for _, listener := range sw.listeners {
166                 go sw.listenerRoutine(listener)
167         }
168         go sw.ensureOutboundPeersRoutine()
169         go sw.connectLANPeersRoutine()
170
171         return nil
172 }
173
174 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
175 func (sw *Switch) OnStop() {
176         if sw.Config.P2P.LANDiscover {
177                 sw.lanDiscv.Stop()
178         }
179
180         for _, listener := range sw.listeners {
181                 listener.Stop()
182         }
183         sw.listeners = nil
184
185         for _, peer := range sw.peers.List() {
186                 peer.Stop()
187                 sw.peers.Remove(peer)
188         }
189
190         for _, reactor := range sw.reactors {
191                 reactor.Stop()
192         }
193 }
194
195 //AddBannedPeer add peer to blacklist
196 func (sw *Switch) AddBannedPeer(ip string) error {
197         sw.mtx.Lock()
198         defer sw.mtx.Unlock()
199
200         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
201         dataJSON, err := json.Marshal(sw.bannedPeer)
202         if err != nil {
203                 return err
204         }
205
206         sw.db.Set([]byte(bannedPeerKey), dataJSON)
207         return nil
208 }
209
210 // AddPeer performs the P2P handshake with a peer
211 // that already has a SecretConnection. If all goes well,
212 // it starts the peer and adds it to the switch.
213 // NOTE: This performs a blocking handshake before the peer is added.
214 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
215 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
216         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
217         if err != nil {
218                 return err
219         }
220
221         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
222                 return err
223         }
224
225         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
226                 return err
227         }
228
229         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
230         if err := sw.filterConnByPeer(peer); err != nil {
231                 return err
232         }
233
234         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
235                 return ErrConnectSpvPeer
236         }
237
238         // Start peer
239         if sw.IsRunning() {
240                 if err := sw.startInitPeer(peer); err != nil {
241                         return err
242                 }
243         }
244
245         return sw.peers.Add(peer)
246 }
247
248 // AddReactor adds the given reactor to the switch.
249 // NOTE: Not goroutine safe.
250 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
251         // Validate the reactor.
252         // No two reactors can share the same channel.
253         for _, chDesc := range reactor.GetChannels() {
254                 chID := chDesc.ID
255                 if sw.reactorsByCh[chID] != nil {
256                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
257                 }
258                 sw.chDescs = append(sw.chDescs, chDesc)
259                 sw.reactorsByCh[chID] = reactor
260         }
261         sw.reactors[name] = reactor
262         reactor.SetSwitch(sw)
263         return reactor
264 }
265
266 // AddListener adds the given listener to the switch for listening to incoming peer connections.
267 // NOTE: Not goroutine safe.
268 func (sw *Switch) AddListener(l Listener) {
269         sw.listeners = append(sw.listeners, l)
270 }
271
272 //DialPeerWithAddress dial node from net address
273 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
274         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
275         sw.dialing.Set(addr.IP.String(), addr)
276         defer sw.dialing.Delete(addr.IP.String())
277         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
278                 return err
279         }
280
281         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
282         if err != nil {
283                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
284                 return err
285         }
286
287         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
288                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
289                 pc.CloseConn()
290                 return err
291         }
292         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
293         return nil
294 }
295
296 func (sw *Switch) ID() [32]byte {
297         return sw.nodeInfo.PubKey
298 }
299
300 //IsDialing prevent duplicate dialing
301 func (sw *Switch) IsDialing(addr *NetAddress) bool {
302         return sw.dialing.Has(addr.IP.String())
303 }
304
305 // IsListening returns true if the switch has at least one listener.
306 // NOTE: Not goroutine safe.
307 func (sw *Switch) IsListening() bool {
308         return len(sw.listeners) > 0
309 }
310
311 // loadBannedPeers load banned peers from db
312 func (sw *Switch) loadBannedPeers() error {
313         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
314                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
315                         return err
316                 }
317         }
318
319         return nil
320 }
321
322 // Listeners returns the list of listeners the switch listens on.
323 // NOTE: Not goroutine safe.
324 func (sw *Switch) Listeners() []Listener {
325         return sw.listeners
326 }
327
328 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
329 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
330         peers := sw.peers.List()
331         for _, peer := range peers {
332                 if peer.outbound && !peer.isLAN {
333                         outbound++
334                 } else {
335                         inbound++
336                 }
337                 if peer.isLAN {
338                         lan++
339                 }
340         }
341         dialing = sw.dialing.Size()
342         return
343 }
344
345 //Peers return switch peerset
346 func (sw *Switch) Peers() *PeerSet {
347         return sw.peers
348 }
349
350 // StopPeerForError disconnects from a peer due to external error.
351 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
352         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
353         sw.stopAndRemovePeer(peer, reason)
354 }
355
356 // StopPeerGracefully disconnect from a peer gracefully.
357 func (sw *Switch) StopPeerGracefully(peerID string) {
358         if peer := sw.peers.Get(peerID); peer != nil {
359                 sw.stopAndRemovePeer(peer, nil)
360         }
361 }
362
363 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
364         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
365         if err != nil {
366                 if err := conn.Close(); err != nil {
367                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
368                 }
369                 return err
370         }
371
372         if err = sw.AddPeer(peerConn, false); err != nil {
373                 if err := conn.Close(); err != nil {
374                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
375                 }
376                 return err
377         }
378
379         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
380         return nil
381 }
382
383 func (sw *Switch) checkBannedPeer(peer string) error {
384         sw.mtx.Lock()
385         defer sw.mtx.Unlock()
386
387         if banEnd, ok := sw.bannedPeer[peer]; ok {
388                 if time.Now().Before(banEnd) {
389                         return ErrConnectBannedPeer
390                 }
391
392                 if err := sw.delBannedPeer(peer); err != nil {
393                         return err
394                 }
395         }
396         return nil
397 }
398
399 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
400         lanPeers, _, _, numDialing := sw.NumPeers()
401         numToDial := maxNumLANPeers - lanPeers
402         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
403         if numToDial <= 0 {
404                 return
405         }
406         addresses := make([]*NetAddress, 0)
407         for i := 0; i < len(lanPeer.IP); i++ {
408                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
409         }
410         sw.dialPeers(addresses)
411 }
412
413 func (sw *Switch) connectLANPeersRoutine() {
414         if !sw.Config.P2P.LANDiscover {
415                 return
416         }
417
418         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
419         if err != nil {
420                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
421                 return
422         }
423
424         for {
425                 select {
426                 case obj, ok := <-lanPeerEventSub.Chan():
427                         if !ok {
428                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
429                                 return
430                         }
431                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
432                         if !ok {
433                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
434                                 continue
435                         }
436                         sw.connectLANPeers(LANPeer)
437                 case <-sw.Quit:
438                         return
439                 }
440         }
441 }
442
443 func (sw *Switch) delBannedPeer(addr string) error {
444         sw.mtx.Lock()
445         defer sw.mtx.Unlock()
446
447         delete(sw.bannedPeer, addr)
448         datajson, err := json.Marshal(sw.bannedPeer)
449         if err != nil {
450                 return err
451         }
452
453         sw.db.Set([]byte(bannedPeerKey), datajson)
454         return nil
455 }
456
457 func (sw *Switch) filterConnByIP(ip string) error {
458         if ip == sw.nodeInfo.listenHost() {
459                 return ErrConnectSelf
460         }
461         return sw.checkBannedPeer(ip)
462 }
463
464 func (sw *Switch) filterConnByPeer(peer *Peer) error {
465         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
466                 return err
467         }
468
469         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
470                 return ErrConnectSelf
471         }
472
473         if sw.peers.Has(peer.Key) {
474                 return ErrDuplicatePeer
475         }
476         return nil
477 }
478
479 func (sw *Switch) listenerRoutine(l Listener) {
480         for {
481                 inConn, ok := <-l.Connections()
482                 if !ok {
483                         break
484                 }
485
486                 // disconnect if we alrady have MaxNumPeers
487                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
488                         if err := inConn.Close(); err != nil {
489                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
490                         }
491                         log.Info("Ignoring inbound connection: already have enough peers.")
492                         continue
493                 }
494
495                 // New inbound connection!
496                 if err := sw.addPeerWithConnection(inConn); err != nil {
497                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
498                         continue
499                 }
500         }
501 }
502
503 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
504         if err := sw.DialPeerWithAddress(a); err != nil {
505                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
506         }
507         wg.Done()
508 }
509
510 func (sw *Switch) dialPeers(addresses []*NetAddress) {
511         connectedPeers := make(map[string]struct{})
512         for _, peer := range sw.Peers().List() {
513                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
514         }
515
516         var wg sync.WaitGroup
517         for _, address := range addresses {
518                 if sw.nodeInfo.ListenAddr == address.String() {
519                         continue
520                 }
521                 if dialling := sw.IsDialing(address); dialling {
522                         continue
523                 }
524                 if _, ok := connectedPeers[address.IP.String()]; ok {
525                         continue
526                 }
527
528                 wg.Add(1)
529                 go sw.dialPeerWorker(address, &wg)
530         }
531         wg.Wait()
532 }
533
534 func (sw *Switch) ensureKeepConnectPeers() {
535         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
536         addresses := make([]*NetAddress, 0)
537         for _, keepDial := range keepDials {
538                 address, err := NewNetAddressString(keepDial)
539                 if err != nil {
540                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
541                         continue
542                 }
543                 addresses = append(addresses, address)
544         }
545
546         sw.dialPeers(addresses)
547 }
548
549 func (sw *Switch) ensureOutboundPeers() {
550         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
551         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
552         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
553         if numToDial <= 0 {
554                 return
555         }
556
557         nodes := make([]*dht.Node, numToDial)
558         n := sw.discv.ReadRandomNodes(nodes)
559         addresses := make([]*NetAddress, 0)
560         for i := 0; i < n; i++ {
561                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
562                 addresses = append(addresses, address)
563         }
564         sw.dialPeers(addresses)
565 }
566
567 func (sw *Switch) ensureOutboundPeersRoutine() {
568         sw.ensureKeepConnectPeers()
569         sw.ensureOutboundPeers()
570
571         ticker := time.NewTicker(10 * time.Second)
572         defer ticker.Stop()
573
574         for {
575                 select {
576                 case <-ticker.C:
577                         sw.ensureKeepConnectPeers()
578                         sw.ensureOutboundPeers()
579                 case <-sw.Quit:
580                         return
581                 }
582         }
583 }
584
585 func (sw *Switch) startInitPeer(peer *Peer) error {
586         // spawn send/recv routines
587         if _, err := peer.Start(); err != nil {
588                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
589         }
590
591         for _, reactor := range sw.reactors {
592                 if err := reactor.AddPeer(peer); err != nil {
593                         return err
594                 }
595         }
596         return nil
597 }
598
599 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
600         sw.peers.Remove(peer)
601         for _, reactor := range sw.reactors {
602                 reactor.RemovePeer(peer, reason)
603         }
604         peer.Stop()
605
606         sentStatus, receivedStatus := peer.TrafficStatus()
607         log.WithFields(log.Fields{
608                 "module":                logModule,
609                 "address":               peer.Addr().String(),
610                 "reason":                reason,
611                 "duration":              sentStatus.Duration.String(),
612                 "total_sent":            sentStatus.Bytes,
613                 "total_received":        receivedStatus.Bytes,
614                 "average_sent_rate":     sentStatus.AvgRate,
615                 "average_received_rate": receivedStatus.AvgRate,
616                 "peer num":              sw.peers.Size(),
617         }).Info("disconnect with peer")
618 }