OSDN Git Service

Hulk did something
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/hex"
5         "encoding/json"
6         "fmt"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         cfg "github.com/vapor/config"
16         "github.com/vapor/consensus"
17         "github.com/vapor/crypto/ed25519"
18         dbm "github.com/vapor/database/leveldb"
19         "github.com/vapor/errors"
20         "github.com/vapor/event"
21         "github.com/vapor/p2p/connection"
22         "github.com/vapor/p2p/discover/dht"
23         "github.com/vapor/p2p/discover/mdns"
24         "github.com/vapor/p2p/netutil"
25         "github.com/vapor/p2p/trust"
26         "github.com/vapor/version"
27 )
28
29 const (
30         bannedPeerKey      = "BannedPeer"
31         defaultBanDuration = time.Hour * 1
32         logModule          = "p2p"
33
34         minNumOutboundPeers = 4
35         maxNumLANPeers      = 5
36 )
37
38 //pre-define errors for connecting fail
39 var (
40         ErrDuplicatePeer     = errors.New("Duplicate peer")
41         ErrConnectSelf       = errors.New("Connect self")
42         ErrConnectBannedPeer = errors.New("Connect banned peer")
43         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
44 )
45
46 type discv interface {
47         ReadRandomNodes(buf []*dht.Node) (n int)
48 }
49
50 type lanDiscv interface {
51         Subscribe() (*event.Subscription, error)
52         Stop()
53 }
54
55 // Switch handles peer connections and exposes an API to receive incoming messages
56 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
57 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
58 // incoming messages are received on the reactor.
59 type Switch struct {
60         cmn.BaseService
61
62         Config       *cfg.Config
63         peerConfig   *PeerConfig
64         listeners    []Listener
65         reactors     map[string]Reactor
66         chDescs      []*connection.ChannelDescriptor
67         reactorsByCh map[byte]Reactor
68         peers        *PeerSet
69         dialing      *cmn.CMap
70         nodeInfo     *NodeInfo             // our node info
71         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
72         discv        discv
73         lanDiscv     lanDiscv
74         bannedPeer   map[string]time.Time
75         db           dbm.DB
76         mtx          sync.Mutex
77 }
78
79 // NewSwitch create a new Switch and set discover.
80 func NewSwitch(config *cfg.Config) (*Switch, error) {
81         var err error
82         var l Listener
83         var listenAddr string
84         var discv *dht.Network
85         var lanDiscv *mdns.LANDiscover
86
87         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
88         config.P2P.PrivateKey, err = config.NodeKey()
89         if err != nil {
90                 return nil, err
91         }
92
93         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
94         if err != nil {
95                 return nil, err
96         }
97
98         var newKey [64]byte
99         copy(newKey[:], bytes)
100         privKey := crypto.PrivKeyEd25519(newKey)
101         if !config.VaultMode {
102                 // Create listener
103                 l, listenAddr = GetListener(config.P2P)
104                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
105                 if err != nil {
106                         return nil, err
107                 }
108                 if config.P2P.LANDiscover {
109                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
110                 }
111         }
112
113         return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr)
114 }
115
116 // newSwitch creates a new Switch with the given config.
117 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
118         sw := &Switch{
119                 Config:       config,
120                 peerConfig:   DefaultPeerConfig(config.P2P),
121                 reactors:     make(map[string]Reactor),
122                 chDescs:      make([]*connection.ChannelDescriptor, 0),
123                 reactorsByCh: make(map[byte]Reactor),
124                 peers:        NewPeerSet(),
125                 dialing:      cmn.NewCMap(),
126                 nodePrivKey:  priv,
127                 discv:        discv,
128                 lanDiscv:     lanDiscv,
129                 db:           blacklistDB,
130                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
131                 bannedPeer:   make(map[string]time.Time),
132         }
133         if err := sw.loadBannedPeers(); err != nil {
134                 return nil, err
135         }
136
137         sw.AddListener(l)
138         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
139         trust.Init()
140         return sw, nil
141 }
142
143 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
144 func (sw *Switch) OnStart() error {
145         for _, reactor := range sw.reactors {
146                 if _, err := reactor.Start(); err != nil {
147                         return err
148                 }
149         }
150         for _, listener := range sw.listeners {
151                 go sw.listenerRoutine(listener)
152         }
153         go sw.ensureOutboundPeersRoutine()
154         go sw.connectLANPeersRoutine()
155
156         return nil
157 }
158
159 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
160 func (sw *Switch) OnStop() {
161         if sw.Config.P2P.LANDiscover {
162                 sw.lanDiscv.Stop()
163         }
164
165         for _, listener := range sw.listeners {
166                 listener.Stop()
167         }
168         sw.listeners = nil
169
170         for _, peer := range sw.peers.List() {
171                 peer.Stop()
172                 sw.peers.Remove(peer)
173         }
174
175         for _, reactor := range sw.reactors {
176                 reactor.Stop()
177         }
178 }
179
180 //AddBannedPeer add peer to blacklist
181 func (sw *Switch) AddBannedPeer(ip string) error {
182         sw.mtx.Lock()
183         defer sw.mtx.Unlock()
184
185         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
186         dataJSON, err := json.Marshal(sw.bannedPeer)
187         if err != nil {
188                 return err
189         }
190
191         sw.db.Set([]byte(bannedPeerKey), dataJSON)
192         return nil
193 }
194
195 // AddPeer performs the P2P handshake with a peer
196 // that already has a SecretConnection. If all goes well,
197 // it starts the peer and adds it to the switch.
198 // NOTE: This performs a blocking handshake before the peer is added.
199 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
200 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
201         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
202         if err != nil {
203                 return err
204         }
205
206         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
207                 return err
208         }
209         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
210                 return err
211         }
212
213         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
214         if err := sw.filterConnByPeer(peer); err != nil {
215                 return err
216         }
217
218         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
219                 return ErrConnectSpvPeer
220         }
221
222         // Start peer
223         if sw.IsRunning() {
224                 if err := sw.startInitPeer(peer); err != nil {
225                         return err
226                 }
227         }
228
229         return sw.peers.Add(peer)
230 }
231
232 // AddReactor adds the given reactor to the switch.
233 // NOTE: Not goroutine safe.
234 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
235         // Validate the reactor.
236         // No two reactors can share the same channel.
237         for _, chDesc := range reactor.GetChannels() {
238                 chID := chDesc.ID
239                 if sw.reactorsByCh[chID] != nil {
240                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
241                 }
242                 sw.chDescs = append(sw.chDescs, chDesc)
243                 sw.reactorsByCh[chID] = reactor
244         }
245         sw.reactors[name] = reactor
246         reactor.SetSwitch(sw)
247         return reactor
248 }
249
250 // AddListener adds the given listener to the switch for listening to incoming peer connections.
251 // NOTE: Not goroutine safe.
252 func (sw *Switch) AddListener(l Listener) {
253         sw.listeners = append(sw.listeners, l)
254 }
255
256 //DialPeerWithAddress dial node from net address
257 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
258         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
259         sw.dialing.Set(addr.IP.String(), addr)
260         defer sw.dialing.Delete(addr.IP.String())
261         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
262                 return err
263         }
264
265         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
266         if err != nil {
267                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
268                 return err
269         }
270
271         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
272                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
273                 pc.CloseConn()
274                 return err
275         }
276         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
277         return nil
278 }
279
280 //IsDialing prevent duplicate dialing
281 func (sw *Switch) IsDialing(addr *NetAddress) bool {
282         return sw.dialing.Has(addr.IP.String())
283 }
284
285 // IsListening returns true if the switch has at least one listener.
286 // NOTE: Not goroutine safe.
287 func (sw *Switch) IsListening() bool {
288         return len(sw.listeners) > 0
289 }
290
291 // loadBannedPeers load banned peers from db
292 func (sw *Switch) loadBannedPeers() error {
293         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
294                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
295                         return err
296                 }
297         }
298
299         return nil
300 }
301
302 // Listeners returns the list of listeners the switch listens on.
303 // NOTE: Not goroutine safe.
304 func (sw *Switch) Listeners() []Listener {
305         return sw.listeners
306 }
307
308 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
309 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
310         peers := sw.peers.List()
311         for _, peer := range peers {
312                 if peer.outbound && !peer.isLAN {
313                         outbound++
314                 } else {
315                         inbound++
316                 }
317                 if peer.isLAN {
318                         lan++
319                 }
320         }
321         dialing = sw.dialing.Size()
322         return
323 }
324
325 // NodeInfo returns the switch's NodeInfo.
326 // NOTE: Not goroutine safe.
327 func (sw *Switch) NodeInfo() *NodeInfo {
328         return sw.nodeInfo
329 }
330
331 //Peers return switch peerset
332 func (sw *Switch) Peers() *PeerSet {
333         return sw.peers
334 }
335
336 // StopPeerForError disconnects from a peer due to external error.
337 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
338         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
339         sw.stopAndRemovePeer(peer, reason)
340 }
341
342 // StopPeerGracefully disconnect from a peer gracefully.
343 func (sw *Switch) StopPeerGracefully(peerID string) {
344         if peer := sw.peers.Get(peerID); peer != nil {
345                 sw.stopAndRemovePeer(peer, nil)
346         }
347 }
348
349 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
350         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
351         if err != nil {
352                 if err := conn.Close(); err != nil {
353                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
354                 }
355                 return err
356         }
357
358         if err = sw.AddPeer(peerConn, false); err != nil {
359                 if err := conn.Close(); err != nil {
360                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
361                 }
362                 return err
363         }
364
365         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
366         return nil
367 }
368
369 func (sw *Switch) checkBannedPeer(peer string) error {
370         sw.mtx.Lock()
371         defer sw.mtx.Unlock()
372
373         if banEnd, ok := sw.bannedPeer[peer]; ok {
374                 if time.Now().Before(banEnd) {
375                         return ErrConnectBannedPeer
376                 }
377
378                 if err := sw.delBannedPeer(peer); err != nil {
379                         return err
380                 }
381         }
382         return nil
383 }
384
385 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
386         lanPeers, _, _, numDialing := sw.NumPeers()
387         numToDial := maxNumLANPeers - lanPeers
388         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
389         if numToDial <= 0 {
390                 return
391         }
392         addresses := make([]*NetAddress, 0)
393         for i := 0; i < len(lanPeer.IP); i++ {
394                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
395         }
396         sw.dialPeers(addresses)
397 }
398
399 func (sw *Switch) connectLANPeersRoutine() {
400         if !sw.Config.P2P.LANDiscover {
401                 return
402         }
403
404         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
405         if err != nil {
406                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
407                 return
408         }
409
410         for {
411                 select {
412                 case obj, ok := <-lanPeerEventSub.Chan():
413                         if !ok {
414                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
415                                 return
416                         }
417                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
418                         if !ok {
419                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
420                                 continue
421                         }
422                         sw.connectLANPeers(LANPeer)
423                 case <-sw.Quit:
424                         return
425                 }
426         }
427 }
428
429 func (sw *Switch) delBannedPeer(addr string) error {
430         sw.mtx.Lock()
431         defer sw.mtx.Unlock()
432
433         delete(sw.bannedPeer, addr)
434         datajson, err := json.Marshal(sw.bannedPeer)
435         if err != nil {
436                 return err
437         }
438
439         sw.db.Set([]byte(bannedPeerKey), datajson)
440         return nil
441 }
442
443 func (sw *Switch) filterConnByIP(ip string) error {
444         if ip == sw.nodeInfo.listenHost() {
445                 return ErrConnectSelf
446         }
447         return sw.checkBannedPeer(ip)
448 }
449
450 func (sw *Switch) filterConnByPeer(peer *Peer) error {
451         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
452                 return err
453         }
454
455         if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) {
456                 return ErrConnectSelf
457         }
458
459         if sw.peers.Has(peer.Key) {
460                 return ErrDuplicatePeer
461         }
462         return nil
463 }
464
465 func (sw *Switch) listenerRoutine(l Listener) {
466         for {
467                 inConn, ok := <-l.Connections()
468                 if !ok {
469                         break
470                 }
471
472                 // disconnect if we alrady have MaxNumPeers
473                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
474                         if err := inConn.Close(); err != nil {
475                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
476                         }
477                         log.Info("Ignoring inbound connection: already have enough peers.")
478                         continue
479                 }
480
481                 // New inbound connection!
482                 if err := sw.addPeerWithConnection(inConn); err != nil {
483                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
484                         continue
485                 }
486         }
487 }
488
489 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
490         if err := sw.DialPeerWithAddress(a); err != nil {
491                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
492         }
493         wg.Done()
494 }
495
496 func (sw *Switch) dialPeers(addresses []*NetAddress) {
497         connectedPeers := make(map[string]struct{})
498         for _, peer := range sw.Peers().List() {
499                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
500         }
501
502         var wg sync.WaitGroup
503         for _, address := range addresses {
504                 if sw.NodeInfo().ListenAddr == address.String() {
505                         continue
506                 }
507                 if dialling := sw.IsDialing(address); dialling {
508                         continue
509                 }
510                 if _, ok := connectedPeers[address.IP.String()]; ok {
511                         continue
512                 }
513
514                 wg.Add(1)
515                 go sw.dialPeerWorker(address, &wg)
516         }
517         wg.Wait()
518 }
519
520 func (sw *Switch) ensureKeepConnectPeers() {
521         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
522         addresses := make([]*NetAddress, 0)
523         for _, keepDial := range keepDials {
524                 address, err := NewNetAddressString(keepDial)
525                 if err != nil {
526                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
527                         continue
528                 }
529                 addresses = append(addresses, address)
530         }
531
532         sw.dialPeers(addresses)
533 }
534
535 func (sw *Switch) ensureOutboundPeers() {
536         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
537         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
538         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
539         if numToDial <= 0 {
540                 return
541         }
542
543         nodes := make([]*dht.Node, numToDial)
544         n := sw.discv.ReadRandomNodes(nodes)
545         addresses := make([]*NetAddress, 0)
546         for i := 0; i < n; i++ {
547                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
548                 addresses = append(addresses, address)
549         }
550         sw.dialPeers(addresses)
551 }
552
553 func (sw *Switch) ensureOutboundPeersRoutine() {
554         sw.ensureKeepConnectPeers()
555         sw.ensureOutboundPeers()
556
557         ticker := time.NewTicker(10 * time.Second)
558         defer ticker.Stop()
559
560         for {
561                 select {
562                 case <-ticker.C:
563                         sw.ensureKeepConnectPeers()
564                         sw.ensureOutboundPeers()
565                 case <-sw.Quit:
566                         return
567                 }
568         }
569 }
570
571 func (sw *Switch) startInitPeer(peer *Peer) error {
572         // spawn send/recv routines
573         if _, err := peer.Start(); err != nil {
574                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
575         }
576
577         for _, reactor := range sw.reactors {
578                 if err := reactor.AddPeer(peer); err != nil {
579                         return err
580                 }
581         }
582         return nil
583 }
584
585 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
586         sw.peers.Remove(peer)
587         for _, reactor := range sw.reactors {
588                 reactor.RemovePeer(peer, reason)
589         }
590         peer.Stop()
591
592         sentStatus, receivedStatus := peer.TrafficStatus()
593         log.WithFields(log.Fields{
594                 "module":                logModule,
595                 "address":               peer.Addr().String(),
596                 "reason":                reason,
597                 "duration":              sentStatus.Duration.String(),
598                 "total_sent":            sentStatus.Bytes,
599                 "total_received":        receivedStatus.Bytes,
600                 "average_sent_rate":     sentStatus.AvgRate,
601                 "average_received_rate": receivedStatus.AvgRate,
602                 "peer num":              sw.peers.Size(),
603         }).Info("disconnect with peer")
604 }