OSDN Git Service

Nodeinfo handshake information modification (#68)
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "encoding/hex"
6         "encoding/json"
7         "fmt"
8         "net"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/vapor/config"
17         "github.com/vapor/consensus"
18         "github.com/vapor/crypto/ed25519"
19         "github.com/vapor/crypto/sha3pool"
20         dbm "github.com/vapor/database/leveldb"
21         "github.com/vapor/errors"
22         "github.com/vapor/event"
23         "github.com/vapor/p2p/connection"
24         "github.com/vapor/p2p/discover/dht"
25         "github.com/vapor/p2p/discover/mdns"
26         "github.com/vapor/p2p/netutil"
27         "github.com/vapor/p2p/trust"
28         "github.com/vapor/version"
29 )
30
31 const (
32         bannedPeerKey      = "BannedPeer"
33         defaultBanDuration = time.Hour * 1
34         logModule          = "p2p"
35
36         minNumOutboundPeers = 4
37         maxNumLANPeers      = 5
38         //magicNumber used to generate unique netID
39         magicNumber = uint64(0x054c5638)
40 )
41
42 //pre-define errors for connecting fail
43 var (
44         ErrDuplicatePeer     = errors.New("Duplicate peer")
45         ErrConnectSelf       = errors.New("Connect self")
46         ErrConnectBannedPeer = errors.New("Connect banned peer")
47         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
48 )
49
50 type discv interface {
51         ReadRandomNodes(buf []*dht.Node) (n int)
52 }
53
54 type lanDiscv interface {
55         Subscribe() (*event.Subscription, error)
56         Stop()
57 }
58
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
63 type Switch struct {
64         cmn.BaseService
65
66         Config       *cfg.Config
67         peerConfig   *PeerConfig
68         listeners    []Listener
69         reactors     map[string]Reactor
70         chDescs      []*connection.ChannelDescriptor
71         reactorsByCh map[byte]Reactor
72         peers        *PeerSet
73         dialing      *cmn.CMap
74         nodeInfo     *NodeInfo             // our node info
75         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
76         discv        discv
77         lanDiscv     lanDiscv
78         bannedPeer   map[string]time.Time
79         db           dbm.DB
80         mtx          sync.Mutex
81 }
82
83 // NewSwitch create a new Switch and set discover.
84 func NewSwitch(config *cfg.Config) (*Switch, error) {
85         var err error
86         var l Listener
87         var listenAddr string
88         var discv *dht.Network
89         var lanDiscv *mdns.LANDiscover
90
91         //generate unique netID
92         var data []byte
93         var h [32]byte
94         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
95         magic := make([]byte, 8)
96         binary.BigEndian.PutUint64(magic, magicNumber)
97         data = append(data, magic[:]...)
98         sha3pool.Sum256(h[:], data)
99         netID := binary.BigEndian.Uint64(h[:8])
100
101         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
102         config.P2P.PrivateKey, err = config.NodeKey()
103         if err != nil {
104                 return nil, err
105         }
106
107         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
108         if err != nil {
109                 return nil, err
110         }
111
112         var newKey [64]byte
113         copy(newKey[:], bytes)
114         privKey := crypto.PrivKeyEd25519(newKey)
115         if !config.VaultMode {
116                 // Create listener
117                 l, listenAddr = GetListener(config.P2P)
118                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port, netID)
119                 if err != nil {
120                         return nil, err
121                 }
122                 if config.P2P.LANDiscover {
123                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
124                 }
125         }
126
127         return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr, netID)
128 }
129
130 // newSwitch creates a new Switch with the given config.
131 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string, netID uint64) (*Switch, error) {
132         sw := &Switch{
133                 Config:       config,
134                 peerConfig:   DefaultPeerConfig(config.P2P),
135                 reactors:     make(map[string]Reactor),
136                 chDescs:      make([]*connection.ChannelDescriptor, 0),
137                 reactorsByCh: make(map[byte]Reactor),
138                 peers:        NewPeerSet(),
139                 dialing:      cmn.NewCMap(),
140                 nodePrivKey:  priv,
141                 discv:        discv,
142                 lanDiscv:     lanDiscv,
143                 db:           blacklistDB,
144                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr, netID),
145                 bannedPeer:   make(map[string]time.Time),
146         }
147         if err := sw.loadBannedPeers(); err != nil {
148                 return nil, err
149         }
150
151         sw.AddListener(l)
152         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
153         trust.Init()
154         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
155         return sw, nil
156 }
157
158 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
159 func (sw *Switch) OnStart() error {
160         for _, reactor := range sw.reactors {
161                 if _, err := reactor.Start(); err != nil {
162                         return err
163                 }
164         }
165         for _, listener := range sw.listeners {
166                 go sw.listenerRoutine(listener)
167         }
168         go sw.ensureOutboundPeersRoutine()
169         go sw.connectLANPeersRoutine()
170
171         return nil
172 }
173
174 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
175 func (sw *Switch) OnStop() {
176         if sw.Config.P2P.LANDiscover {
177                 sw.lanDiscv.Stop()
178         }
179
180         for _, listener := range sw.listeners {
181                 listener.Stop()
182         }
183         sw.listeners = nil
184
185         for _, peer := range sw.peers.List() {
186                 peer.Stop()
187                 sw.peers.Remove(peer)
188         }
189
190         for _, reactor := range sw.reactors {
191                 reactor.Stop()
192         }
193 }
194
195 //AddBannedPeer add peer to blacklist
196 func (sw *Switch) AddBannedPeer(ip string) error {
197         sw.mtx.Lock()
198         defer sw.mtx.Unlock()
199
200         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
201         dataJSON, err := json.Marshal(sw.bannedPeer)
202         if err != nil {
203                 return err
204         }
205
206         sw.db.Set([]byte(bannedPeerKey), dataJSON)
207         return nil
208 }
209
210 // AddPeer performs the P2P handshake with a peer
211 // that already has a SecretConnection. If all goes well,
212 // it starts the peer and adds it to the switch.
213 // NOTE: This performs a blocking handshake before the peer is added.
214 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
215 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
216         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
217         if err != nil {
218                 return err
219         }
220
221         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
222                 return err
223         }
224
225         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
226                 return err
227         }
228
229         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
230         if err := sw.filterConnByPeer(peer); err != nil {
231                 return err
232         }
233
234         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
235                 return ErrConnectSpvPeer
236         }
237
238         // Start peer
239         if sw.IsRunning() {
240                 if err := sw.startInitPeer(peer); err != nil {
241                         return err
242                 }
243         }
244
245         return sw.peers.Add(peer)
246 }
247
248 // AddReactor adds the given reactor to the switch.
249 // NOTE: Not goroutine safe.
250 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
251         // Validate the reactor.
252         // No two reactors can share the same channel.
253         for _, chDesc := range reactor.GetChannels() {
254                 chID := chDesc.ID
255                 if sw.reactorsByCh[chID] != nil {
256                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
257                 }
258                 sw.chDescs = append(sw.chDescs, chDesc)
259                 sw.reactorsByCh[chID] = reactor
260         }
261         sw.reactors[name] = reactor
262         reactor.SetSwitch(sw)
263         return reactor
264 }
265
266 // AddListener adds the given listener to the switch for listening to incoming peer connections.
267 // NOTE: Not goroutine safe.
268 func (sw *Switch) AddListener(l Listener) {
269         sw.listeners = append(sw.listeners, l)
270 }
271
272 //DialPeerWithAddress dial node from net address
273 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
274         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
275         sw.dialing.Set(addr.IP.String(), addr)
276         defer sw.dialing.Delete(addr.IP.String())
277         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
278                 return err
279         }
280
281         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
282         if err != nil {
283                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
284                 return err
285         }
286
287         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
288                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
289                 pc.CloseConn()
290                 return err
291         }
292         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
293         return nil
294 }
295
296 //IsDialing prevent duplicate dialing
297 func (sw *Switch) IsDialing(addr *NetAddress) bool {
298         return sw.dialing.Has(addr.IP.String())
299 }
300
301 // IsListening returns true if the switch has at least one listener.
302 // NOTE: Not goroutine safe.
303 func (sw *Switch) IsListening() bool {
304         return len(sw.listeners) > 0
305 }
306
307 // loadBannedPeers load banned peers from db
308 func (sw *Switch) loadBannedPeers() error {
309         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
310                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
311                         return err
312                 }
313         }
314
315         return nil
316 }
317
318 // Listeners returns the list of listeners the switch listens on.
319 // NOTE: Not goroutine safe.
320 func (sw *Switch) Listeners() []Listener {
321         return sw.listeners
322 }
323
324 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
325 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
326         peers := sw.peers.List()
327         for _, peer := range peers {
328                 if peer.outbound && !peer.isLAN {
329                         outbound++
330                 } else {
331                         inbound++
332                 }
333                 if peer.isLAN {
334                         lan++
335                 }
336         }
337         dialing = sw.dialing.Size()
338         return
339 }
340
341 //Peers return switch peerset
342 func (sw *Switch) Peers() *PeerSet {
343         return sw.peers
344 }
345
346 // StopPeerForError disconnects from a peer due to external error.
347 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
348         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
349         sw.stopAndRemovePeer(peer, reason)
350 }
351
352 // StopPeerGracefully disconnect from a peer gracefully.
353 func (sw *Switch) StopPeerGracefully(peerID string) {
354         if peer := sw.peers.Get(peerID); peer != nil {
355                 sw.stopAndRemovePeer(peer, nil)
356         }
357 }
358
359 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
360         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
361         if err != nil {
362                 if err := conn.Close(); err != nil {
363                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
364                 }
365                 return err
366         }
367
368         if err = sw.AddPeer(peerConn, false); err != nil {
369                 if err := conn.Close(); err != nil {
370                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
371                 }
372                 return err
373         }
374
375         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
376         return nil
377 }
378
379 func (sw *Switch) checkBannedPeer(peer string) error {
380         sw.mtx.Lock()
381         defer sw.mtx.Unlock()
382
383         if banEnd, ok := sw.bannedPeer[peer]; ok {
384                 if time.Now().Before(banEnd) {
385                         return ErrConnectBannedPeer
386                 }
387
388                 if err := sw.delBannedPeer(peer); err != nil {
389                         return err
390                 }
391         }
392         return nil
393 }
394
395 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
396         lanPeers, _, _, numDialing := sw.NumPeers()
397         numToDial := maxNumLANPeers - lanPeers
398         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
399         if numToDial <= 0 {
400                 return
401         }
402         addresses := make([]*NetAddress, 0)
403         for i := 0; i < len(lanPeer.IP); i++ {
404                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
405         }
406         sw.dialPeers(addresses)
407 }
408
409 func (sw *Switch) connectLANPeersRoutine() {
410         if !sw.Config.P2P.LANDiscover {
411                 return
412         }
413
414         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
415         if err != nil {
416                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
417                 return
418         }
419
420         for {
421                 select {
422                 case obj, ok := <-lanPeerEventSub.Chan():
423                         if !ok {
424                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
425                                 return
426                         }
427                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
428                         if !ok {
429                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
430                                 continue
431                         }
432                         sw.connectLANPeers(LANPeer)
433                 case <-sw.Quit:
434                         return
435                 }
436         }
437 }
438
439 func (sw *Switch) delBannedPeer(addr string) error {
440         sw.mtx.Lock()
441         defer sw.mtx.Unlock()
442
443         delete(sw.bannedPeer, addr)
444         datajson, err := json.Marshal(sw.bannedPeer)
445         if err != nil {
446                 return err
447         }
448
449         sw.db.Set([]byte(bannedPeerKey), datajson)
450         return nil
451 }
452
453 func (sw *Switch) filterConnByIP(ip string) error {
454         if ip == sw.nodeInfo.listenHost() {
455                 return ErrConnectSelf
456         }
457         return sw.checkBannedPeer(ip)
458 }
459
460 func (sw *Switch) filterConnByPeer(peer *Peer) error {
461         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
462                 return err
463         }
464
465         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
466                 return ErrConnectSelf
467         }
468
469         if sw.peers.Has(peer.Key) {
470                 return ErrDuplicatePeer
471         }
472         return nil
473 }
474
475 func (sw *Switch) listenerRoutine(l Listener) {
476         for {
477                 inConn, ok := <-l.Connections()
478                 if !ok {
479                         break
480                 }
481
482                 // disconnect if we alrady have MaxNumPeers
483                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
484                         if err := inConn.Close(); err != nil {
485                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
486                         }
487                         log.Info("Ignoring inbound connection: already have enough peers.")
488                         continue
489                 }
490
491                 // New inbound connection!
492                 if err := sw.addPeerWithConnection(inConn); err != nil {
493                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
494                         continue
495                 }
496         }
497 }
498
499 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
500         if err := sw.DialPeerWithAddress(a); err != nil {
501                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
502         }
503         wg.Done()
504 }
505
506 func (sw *Switch) dialPeers(addresses []*NetAddress) {
507         connectedPeers := make(map[string]struct{})
508         for _, peer := range sw.Peers().List() {
509                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
510         }
511
512         var wg sync.WaitGroup
513         for _, address := range addresses {
514                 if sw.nodeInfo.ListenAddr == address.String() {
515                         continue
516                 }
517                 if dialling := sw.IsDialing(address); dialling {
518                         continue
519                 }
520                 if _, ok := connectedPeers[address.IP.String()]; ok {
521                         continue
522                 }
523
524                 wg.Add(1)
525                 go sw.dialPeerWorker(address, &wg)
526         }
527         wg.Wait()
528 }
529
530 func (sw *Switch) ensureKeepConnectPeers() {
531         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
532         addresses := make([]*NetAddress, 0)
533         for _, keepDial := range keepDials {
534                 address, err := NewNetAddressString(keepDial)
535                 if err != nil {
536                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
537                         continue
538                 }
539                 addresses = append(addresses, address)
540         }
541
542         sw.dialPeers(addresses)
543 }
544
545 func (sw *Switch) ensureOutboundPeers() {
546         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
547         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
548         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
549         if numToDial <= 0 {
550                 return
551         }
552
553         nodes := make([]*dht.Node, numToDial)
554         n := sw.discv.ReadRandomNodes(nodes)
555         addresses := make([]*NetAddress, 0)
556         for i := 0; i < n; i++ {
557                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
558                 addresses = append(addresses, address)
559         }
560         sw.dialPeers(addresses)
561 }
562
563 func (sw *Switch) ensureOutboundPeersRoutine() {
564         sw.ensureKeepConnectPeers()
565         sw.ensureOutboundPeers()
566
567         ticker := time.NewTicker(10 * time.Second)
568         defer ticker.Stop()
569
570         for {
571                 select {
572                 case <-ticker.C:
573                         sw.ensureKeepConnectPeers()
574                         sw.ensureOutboundPeers()
575                 case <-sw.Quit:
576                         return
577                 }
578         }
579 }
580
581 func (sw *Switch) startInitPeer(peer *Peer) error {
582         // spawn send/recv routines
583         if _, err := peer.Start(); err != nil {
584                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
585         }
586
587         for _, reactor := range sw.reactors {
588                 if err := reactor.AddPeer(peer); err != nil {
589                         return err
590                 }
591         }
592         return nil
593 }
594
595 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
596         sw.peers.Remove(peer)
597         for _, reactor := range sw.reactors {
598                 reactor.RemovePeer(peer, reason)
599         }
600         peer.Stop()
601
602         sentStatus, receivedStatus := peer.TrafficStatus()
603         log.WithFields(log.Fields{
604                 "module":                logModule,
605                 "address":               peer.Addr().String(),
606                 "reason":                reason,
607                 "duration":              sentStatus.Duration.String(),
608                 "total_sent":            sentStatus.Bytes,
609                 "total_received":        receivedStatus.Bytes,
610                 "average_sent_rate":     sentStatus.AvgRate,
611                 "average_received_rate": receivedStatus.AvgRate,
612                 "peer num":              sw.peers.Size(),
613         }).Info("disconnect with peer")
614 }