OSDN Git Service

临时排坑 (#111)
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "encoding/hex"
6         "encoding/json"
7         "fmt"
8         "net"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         crypto "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/vapor/config"
17         "github.com/vapor/consensus"
18         "github.com/vapor/crypto/ed25519"
19         "github.com/vapor/crypto/sha3pool"
20         dbm "github.com/vapor/database/leveldb"
21         "github.com/vapor/errors"
22         "github.com/vapor/event"
23         "github.com/vapor/p2p/connection"
24         "github.com/vapor/p2p/discover/dht"
25         "github.com/vapor/p2p/discover/mdns"
26         "github.com/vapor/p2p/netutil"
27         "github.com/vapor/p2p/trust"
28         "github.com/vapor/version"
29 )
30
31 const (
32         bannedPeerKey      = "BannedPeer"
33         defaultBanDuration = time.Hour * 1
34         logModule          = "p2p"
35
36         minNumOutboundPeers = 4
37         maxNumLANPeers      = 5
38         //magicNumber used to generate unique netID
39         magicNumber = uint64(0x054c5638)
40 )
41
42 //pre-define errors for connecting fail
43 var (
44         ErrDuplicatePeer     = errors.New("Duplicate peer")
45         ErrConnectSelf       = errors.New("Connect self")
46         ErrConnectBannedPeer = errors.New("Connect banned peer")
47         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
48 )
49
50 type discv interface {
51         ReadRandomNodes(buf []*dht.Node) (n int)
52 }
53
54 type lanDiscv interface {
55         Subscribe() (*event.Subscription, error)
56         Stop()
57 }
58
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
63 type Switch struct {
64         cmn.BaseService
65
66         Config       *cfg.Config
67         peerConfig   *PeerConfig
68         listeners    []Listener
69         reactors     map[string]Reactor
70         chDescs      []*connection.ChannelDescriptor
71         reactorsByCh map[byte]Reactor
72         peers        *PeerSet
73         dialing      *cmn.CMap
74         nodeInfo     *NodeInfo             // our node info
75         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
76         discv        discv
77         lanDiscv     lanDiscv
78         bannedPeer   map[string]time.Time
79         db           dbm.DB
80         mtx          sync.Mutex
81 }
82
83 // NewSwitch create a new Switch and set discover.
84 func NewSwitch(config *cfg.Config) (*Switch, error) {
85         var err error
86         var l Listener
87         var listenAddr string
88         var discv *dht.Network
89         var lanDiscv *mdns.LANDiscover
90
91         //generate unique netID
92         var data []byte
93         var h [32]byte
94         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
95         magic := make([]byte, 8)
96         binary.BigEndian.PutUint64(magic, magicNumber)
97         data = append(data, magic[:]...)
98         sha3pool.Sum256(h[:], data)
99         netID := binary.BigEndian.Uint64(h[:8])
100
101         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
102
103         _, yyy, _ := ed25519.GenerateKey(nil)
104         zzz := yyy.String()
105
106         bytes, err := hex.DecodeString(zzz)
107         if err != nil {
108                 return nil, err
109         }
110         var newKey [64]byte
111         copy(newKey[:], bytes)
112         privKey := crypto.PrivKeyEd25519(newKey)
113         if !config.VaultMode {
114                 // Create listener
115                 l, listenAddr = GetListener(config.P2P)
116                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port, netID)
117                 if err != nil {
118                         return nil, err
119                 }
120                 if config.P2P.LANDiscover {
121                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
122                 }
123         }
124
125         return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr, netID)
126 }
127
128 // newSwitch creates a new Switch with the given config.
129 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string, netID uint64) (*Switch, error) {
130         sw := &Switch{
131                 Config:       config,
132                 peerConfig:   DefaultPeerConfig(config.P2P),
133                 reactors:     make(map[string]Reactor),
134                 chDescs:      make([]*connection.ChannelDescriptor, 0),
135                 reactorsByCh: make(map[byte]Reactor),
136                 peers:        NewPeerSet(),
137                 dialing:      cmn.NewCMap(),
138                 nodePrivKey:  priv,
139                 discv:        discv,
140                 lanDiscv:     lanDiscv,
141                 db:           blacklistDB,
142                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr, netID),
143                 bannedPeer:   make(map[string]time.Time),
144         }
145         if err := sw.loadBannedPeers(); err != nil {
146                 return nil, err
147         }
148
149         sw.AddListener(l)
150         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
151         trust.Init()
152         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
153         return sw, nil
154 }
155
156 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
157 func (sw *Switch) OnStart() error {
158         for _, reactor := range sw.reactors {
159                 if _, err := reactor.Start(); err != nil {
160                         return err
161                 }
162         }
163         for _, listener := range sw.listeners {
164                 go sw.listenerRoutine(listener)
165         }
166         go sw.ensureOutboundPeersRoutine()
167         go sw.connectLANPeersRoutine()
168
169         return nil
170 }
171
172 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
173 func (sw *Switch) OnStop() {
174         if sw.Config.P2P.LANDiscover {
175                 sw.lanDiscv.Stop()
176         }
177
178         for _, listener := range sw.listeners {
179                 listener.Stop()
180         }
181         sw.listeners = nil
182
183         for _, peer := range sw.peers.List() {
184                 peer.Stop()
185                 sw.peers.Remove(peer)
186         }
187
188         for _, reactor := range sw.reactors {
189                 reactor.Stop()
190         }
191 }
192
193 //AddBannedPeer add peer to blacklist
194 func (sw *Switch) AddBannedPeer(ip string) error {
195         sw.mtx.Lock()
196         defer sw.mtx.Unlock()
197
198         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
199         dataJSON, err := json.Marshal(sw.bannedPeer)
200         if err != nil {
201                 return err
202         }
203
204         sw.db.Set([]byte(bannedPeerKey), dataJSON)
205         return nil
206 }
207
208 // AddPeer performs the P2P handshake with a peer
209 // that already has a SecretConnection. If all goes well,
210 // it starts the peer and adds it to the switch.
211 // NOTE: This performs a blocking handshake before the peer is added.
212 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
213 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
214         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
215         if err != nil {
216                 return err
217         }
218
219         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
220                 return err
221         }
222
223         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
224                 return err
225         }
226
227         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
228         if err := sw.filterConnByPeer(peer); err != nil {
229                 return err
230         }
231
232         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
233                 return ErrConnectSpvPeer
234         }
235
236         // Start peer
237         if sw.IsRunning() {
238                 if err := sw.startInitPeer(peer); err != nil {
239                         return err
240                 }
241         }
242
243         return sw.peers.Add(peer)
244 }
245
246 // AddReactor adds the given reactor to the switch.
247 // NOTE: Not goroutine safe.
248 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
249         // Validate the reactor.
250         // No two reactors can share the same channel.
251         for _, chDesc := range reactor.GetChannels() {
252                 chID := chDesc.ID
253                 if sw.reactorsByCh[chID] != nil {
254                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
255                 }
256                 sw.chDescs = append(sw.chDescs, chDesc)
257                 sw.reactorsByCh[chID] = reactor
258         }
259         sw.reactors[name] = reactor
260         reactor.SetSwitch(sw)
261         return reactor
262 }
263
264 // AddListener adds the given listener to the switch for listening to incoming peer connections.
265 // NOTE: Not goroutine safe.
266 func (sw *Switch) AddListener(l Listener) {
267         sw.listeners = append(sw.listeners, l)
268 }
269
270 //DialPeerWithAddress dial node from net address
271 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
272         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
273         sw.dialing.Set(addr.IP.String(), addr)
274         defer sw.dialing.Delete(addr.IP.String())
275         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
276                 return err
277         }
278
279         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
280         if err != nil {
281                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
282                 return err
283         }
284
285         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
286                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
287                 pc.CloseConn()
288                 return err
289         }
290         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
291         return nil
292 }
293
294 func (sw *Switch) ID() [32]byte {
295         return sw.nodeInfo.PubKey
296 }
297
298 //IsDialing prevent duplicate dialing
299 func (sw *Switch) IsDialing(addr *NetAddress) bool {
300         return sw.dialing.Has(addr.IP.String())
301 }
302
303 // IsListening returns true if the switch has at least one listener.
304 // NOTE: Not goroutine safe.
305 func (sw *Switch) IsListening() bool {
306         return len(sw.listeners) > 0
307 }
308
309 // loadBannedPeers load banned peers from db
310 func (sw *Switch) loadBannedPeers() error {
311         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
312                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
313                         return err
314                 }
315         }
316
317         return nil
318 }
319
320 // Listeners returns the list of listeners the switch listens on.
321 // NOTE: Not goroutine safe.
322 func (sw *Switch) Listeners() []Listener {
323         return sw.listeners
324 }
325
326 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
327 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
328         peers := sw.peers.List()
329         for _, peer := range peers {
330                 if peer.outbound && !peer.isLAN {
331                         outbound++
332                 } else {
333                         inbound++
334                 }
335                 if peer.isLAN {
336                         lan++
337                 }
338         }
339         dialing = sw.dialing.Size()
340         return
341 }
342
343 //Peers return switch peerset
344 func (sw *Switch) Peers() *PeerSet {
345         return sw.peers
346 }
347
348 // StopPeerForError disconnects from a peer due to external error.
349 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
350         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
351         sw.stopAndRemovePeer(peer, reason)
352 }
353
354 // StopPeerGracefully disconnect from a peer gracefully.
355 func (sw *Switch) StopPeerGracefully(peerID string) {
356         if peer := sw.peers.Get(peerID); peer != nil {
357                 sw.stopAndRemovePeer(peer, nil)
358         }
359 }
360
361 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
362         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
363         if err != nil {
364                 if err := conn.Close(); err != nil {
365                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
366                 }
367                 return err
368         }
369
370         if err = sw.AddPeer(peerConn, false); err != nil {
371                 if err := conn.Close(); err != nil {
372                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
373                 }
374                 return err
375         }
376
377         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
378         return nil
379 }
380
381 func (sw *Switch) checkBannedPeer(peer string) error {
382         sw.mtx.Lock()
383         defer sw.mtx.Unlock()
384
385         if banEnd, ok := sw.bannedPeer[peer]; ok {
386                 if time.Now().Before(banEnd) {
387                         return ErrConnectBannedPeer
388                 }
389
390                 if err := sw.delBannedPeer(peer); err != nil {
391                         return err
392                 }
393         }
394         return nil
395 }
396
397 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
398         lanPeers, _, _, numDialing := sw.NumPeers()
399         numToDial := maxNumLANPeers - lanPeers
400         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
401         if numToDial <= 0 {
402                 return
403         }
404         addresses := make([]*NetAddress, 0)
405         for i := 0; i < len(lanPeer.IP); i++ {
406                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
407         }
408         sw.dialPeers(addresses)
409 }
410
411 func (sw *Switch) connectLANPeersRoutine() {
412         if !sw.Config.P2P.LANDiscover {
413                 return
414         }
415
416         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
417         if err != nil {
418                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
419                 return
420         }
421
422         for {
423                 select {
424                 case obj, ok := <-lanPeerEventSub.Chan():
425                         if !ok {
426                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
427                                 return
428                         }
429                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
430                         if !ok {
431                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
432                                 continue
433                         }
434                         sw.connectLANPeers(LANPeer)
435                 case <-sw.Quit:
436                         return
437                 }
438         }
439 }
440
441 func (sw *Switch) delBannedPeer(addr string) error {
442         sw.mtx.Lock()
443         defer sw.mtx.Unlock()
444
445         delete(sw.bannedPeer, addr)
446         datajson, err := json.Marshal(sw.bannedPeer)
447         if err != nil {
448                 return err
449         }
450
451         sw.db.Set([]byte(bannedPeerKey), datajson)
452         return nil
453 }
454
455 func (sw *Switch) filterConnByIP(ip string) error {
456         if ip == sw.nodeInfo.listenHost() {
457                 return ErrConnectSelf
458         }
459         return sw.checkBannedPeer(ip)
460 }
461
462 func (sw *Switch) filterConnByPeer(peer *Peer) error {
463         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
464                 return err
465         }
466
467         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
468                 return ErrConnectSelf
469         }
470
471         if sw.peers.Has(peer.Key) {
472                 return ErrDuplicatePeer
473         }
474         return nil
475 }
476
477 func (sw *Switch) listenerRoutine(l Listener) {
478         for {
479                 inConn, ok := <-l.Connections()
480                 if !ok {
481                         break
482                 }
483
484                 // disconnect if we alrady have MaxNumPeers
485                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
486                         if err := inConn.Close(); err != nil {
487                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
488                         }
489                         log.Info("Ignoring inbound connection: already have enough peers.")
490                         continue
491                 }
492
493                 // New inbound connection!
494                 if err := sw.addPeerWithConnection(inConn); err != nil {
495                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
496                         continue
497                 }
498         }
499 }
500
501 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
502         if err := sw.DialPeerWithAddress(a); err != nil {
503                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
504         }
505         wg.Done()
506 }
507
508 func (sw *Switch) dialPeers(addresses []*NetAddress) {
509         connectedPeers := make(map[string]struct{})
510         for _, peer := range sw.Peers().List() {
511                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
512         }
513
514         var wg sync.WaitGroup
515         for _, address := range addresses {
516                 if sw.nodeInfo.ListenAddr == address.String() {
517                         continue
518                 }
519                 if dialling := sw.IsDialing(address); dialling {
520                         continue
521                 }
522                 if _, ok := connectedPeers[address.IP.String()]; ok {
523                         continue
524                 }
525
526                 wg.Add(1)
527                 go sw.dialPeerWorker(address, &wg)
528         }
529         wg.Wait()
530 }
531
532 func (sw *Switch) ensureKeepConnectPeers() {
533         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
534         addresses := make([]*NetAddress, 0)
535         for _, keepDial := range keepDials {
536                 address, err := NewNetAddressString(keepDial)
537                 if err != nil {
538                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
539                         continue
540                 }
541                 addresses = append(addresses, address)
542         }
543
544         sw.dialPeers(addresses)
545 }
546
547 func (sw *Switch) ensureOutboundPeers() {
548         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
549         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
550         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
551         if numToDial <= 0 {
552                 return
553         }
554
555         nodes := make([]*dht.Node, numToDial)
556         n := sw.discv.ReadRandomNodes(nodes)
557         addresses := make([]*NetAddress, 0)
558         for i := 0; i < n; i++ {
559                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
560                 addresses = append(addresses, address)
561         }
562         sw.dialPeers(addresses)
563 }
564
565 func (sw *Switch) ensureOutboundPeersRoutine() {
566         sw.ensureKeepConnectPeers()
567         sw.ensureOutboundPeers()
568
569         ticker := time.NewTicker(10 * time.Second)
570         defer ticker.Stop()
571
572         for {
573                 select {
574                 case <-ticker.C:
575                         sw.ensureKeepConnectPeers()
576                         sw.ensureOutboundPeers()
577                 case <-sw.Quit:
578                         return
579                 }
580         }
581 }
582
583 func (sw *Switch) startInitPeer(peer *Peer) error {
584         // spawn send/recv routines
585         if _, err := peer.Start(); err != nil {
586                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
587         }
588
589         for _, reactor := range sw.reactors {
590                 if err := reactor.AddPeer(peer); err != nil {
591                         return err
592                 }
593         }
594         return nil
595 }
596
597 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
598         sw.peers.Remove(peer)
599         for _, reactor := range sw.reactors {
600                 reactor.RemovePeer(peer, reason)
601         }
602         peer.Stop()
603
604         sentStatus, receivedStatus := peer.TrafficStatus()
605         log.WithFields(log.Fields{
606                 "module":                logModule,
607                 "address":               peer.Addr().String(),
608                 "reason":                reason,
609                 "duration":              sentStatus.Duration.String(),
610                 "total_sent":            sentStatus.Bytes,
611                 "total_received":        receivedStatus.Bytes,
612                 "average_sent_rate":     sentStatus.AvgRate,
613                 "average_received_rate": receivedStatus.AvgRate,
614                 "peer num":              sw.peers.Size(),
615         }).Info("disconnect with peer")
616 }