OSDN Git Service

fix: use mainchain address for cross_chain_out (#131)
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/binary"
5         "encoding/json"
6         "fmt"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         cfg "github.com/vapor/config"
15         "github.com/vapor/consensus"
16         "github.com/vapor/crypto/sha3pool"
17         dbm "github.com/vapor/database/leveldb"
18         "github.com/vapor/errors"
19         "github.com/vapor/event"
20         "github.com/vapor/p2p/connection"
21         "github.com/vapor/p2p/discover/dht"
22         "github.com/vapor/p2p/discover/mdns"
23         "github.com/vapor/p2p/netutil"
24         "github.com/vapor/p2p/signlib"
25         "github.com/vapor/p2p/trust"
26         "github.com/vapor/version"
27 )
28
29 const (
30         bannedPeerKey      = "BannedPeer"
31         defaultBanDuration = time.Hour * 1
32         logModule          = "p2p"
33
34         minNumOutboundPeers = 4
35         maxNumLANPeers      = 5
36         //magicNumber used to generate unique netID
37         magicNumber = uint64(0x054c5638)
38 )
39
40 //pre-define errors for connecting fail
41 var (
42         ErrDuplicatePeer     = errors.New("Duplicate peer")
43         ErrConnectSelf       = errors.New("Connect self")
44         ErrConnectBannedPeer = errors.New("Connect banned peer")
45         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
46 )
47
48 type discv interface {
49         ReadRandomNodes(buf []*dht.Node) (n int)
50 }
51
52 type lanDiscv interface {
53         Subscribe() (*event.Subscription, error)
54         Stop()
55 }
56
57 // Switch handles peer connections and exposes an API to receive incoming messages
58 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
59 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
60 // incoming messages are received on the reactor.
61 type Switch struct {
62         cmn.BaseService
63
64         Config       *cfg.Config
65         peerConfig   *PeerConfig
66         listeners    []Listener
67         reactors     map[string]Reactor
68         chDescs      []*connection.ChannelDescriptor
69         reactorsByCh map[byte]Reactor
70         peers        *PeerSet
71         dialing      *cmn.CMap
72         nodeInfo     *NodeInfo       // our node info
73         nodePrivKey  signlib.PrivKey // our node privkey
74         discv        discv
75         lanDiscv     lanDiscv
76         bannedPeer   map[string]time.Time
77         db           dbm.DB
78         mtx          sync.Mutex
79 }
80
81 // NewSwitch create a new Switch and set discover.
82 func NewSwitch(config *cfg.Config) (*Switch, error) {
83         var err error
84         var l Listener
85         var listenAddr string
86         var discv *dht.Network
87         var lanDiscv *mdns.LANDiscover
88
89         //generate unique netID
90         var data []byte
91         var h [32]byte
92         data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
93         magic := make([]byte, 8)
94         binary.BigEndian.PutUint64(magic, magicNumber)
95         data = append(data, magic[:]...)
96         sha3pool.Sum256(h[:], data)
97         netID := binary.BigEndian.Uint64(h[:8])
98
99         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
100         privateKey := config.PrivateKey()
101         if !config.VaultMode {
102                 // Create listener
103                 l, listenAddr = GetListener(config.P2P)
104                 discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID)
105                 if err != nil {
106                         return nil, err
107                 }
108                 if config.P2P.LANDiscover {
109                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
110                 }
111         }
112
113         return newSwitch(config, discv, lanDiscv, blacklistDB, l, *privateKey, listenAddr, netID)
114 }
115
116 // newSwitch creates a new Switch with the given config.
117 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
118         sw := &Switch{
119                 Config:       config,
120                 peerConfig:   DefaultPeerConfig(config.P2P),
121                 reactors:     make(map[string]Reactor),
122                 chDescs:      make([]*connection.ChannelDescriptor, 0),
123                 reactorsByCh: make(map[byte]Reactor),
124                 peers:        NewPeerSet(),
125                 dialing:      cmn.NewCMap(),
126                 nodePrivKey:  privKey,
127                 discv:        discv,
128                 lanDiscv:     lanDiscv,
129                 db:           blacklistDB,
130                 nodeInfo:     NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
131                 bannedPeer:   make(map[string]time.Time),
132         }
133         if err := sw.loadBannedPeers(); err != nil {
134                 return nil, err
135         }
136
137         sw.AddListener(l)
138         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
139         trust.Init()
140         log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
141         return sw, nil
142 }
143
144 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
145 func (sw *Switch) OnStart() error {
146         for _, reactor := range sw.reactors {
147                 if _, err := reactor.Start(); err != nil {
148                         return err
149                 }
150         }
151         for _, listener := range sw.listeners {
152                 go sw.listenerRoutine(listener)
153         }
154         go sw.ensureOutboundPeersRoutine()
155         go sw.connectLANPeersRoutine()
156
157         return nil
158 }
159
160 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
161 func (sw *Switch) OnStop() {
162         if sw.Config.P2P.LANDiscover {
163                 sw.lanDiscv.Stop()
164         }
165
166         for _, listener := range sw.listeners {
167                 listener.Stop()
168         }
169         sw.listeners = nil
170
171         for _, peer := range sw.peers.List() {
172                 peer.Stop()
173                 sw.peers.Remove(peer)
174         }
175
176         for _, reactor := range sw.reactors {
177                 reactor.Stop()
178         }
179 }
180
181 //AddBannedPeer add peer to blacklist
182 func (sw *Switch) AddBannedPeer(ip string) error {
183         sw.mtx.Lock()
184         defer sw.mtx.Unlock()
185
186         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
187         dataJSON, err := json.Marshal(sw.bannedPeer)
188         if err != nil {
189                 return err
190         }
191
192         sw.db.Set([]byte(bannedPeerKey), dataJSON)
193         return nil
194 }
195
196 // AddPeer performs the P2P handshake with a peer
197 // that already has a SecretConnection. If all goes well,
198 // it starts the peer and adds it to the switch.
199 // NOTE: This performs a blocking handshake before the peer is added.
200 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
201 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
202         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
203         if err != nil {
204                 return err
205         }
206
207         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
208                 return err
209         }
210
211         if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
212                 return err
213         }
214
215         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
216         if err := sw.filterConnByPeer(peer); err != nil {
217                 return err
218         }
219
220         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
221                 return ErrConnectSpvPeer
222         }
223
224         // Start peer
225         if sw.IsRunning() {
226                 if err := sw.startInitPeer(peer); err != nil {
227                         return err
228                 }
229         }
230
231         return sw.peers.Add(peer)
232 }
233
234 // AddReactor adds the given reactor to the switch.
235 // NOTE: Not goroutine safe.
236 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
237         // Validate the reactor.
238         // No two reactors can share the same channel.
239         for _, chDesc := range reactor.GetChannels() {
240                 chID := chDesc.ID
241                 if sw.reactorsByCh[chID] != nil {
242                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
243                 }
244                 sw.chDescs = append(sw.chDescs, chDesc)
245                 sw.reactorsByCh[chID] = reactor
246         }
247         sw.reactors[name] = reactor
248         reactor.SetSwitch(sw)
249         return reactor
250 }
251
252 // AddListener adds the given listener to the switch for listening to incoming peer connections.
253 // NOTE: Not goroutine safe.
254 func (sw *Switch) AddListener(l Listener) {
255         sw.listeners = append(sw.listeners, l)
256 }
257
258 //DialPeerWithAddress dial node from net address
259 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
260         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
261         sw.dialing.Set(addr.IP.String(), addr)
262         defer sw.dialing.Delete(addr.IP.String())
263         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
264                 return err
265         }
266
267         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
268         if err != nil {
269                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
270                 return err
271         }
272
273         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
274                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
275                 pc.CloseConn()
276                 return err
277         }
278         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
279         return nil
280 }
281
282 //IsDialing prevent duplicate dialing
283 func (sw *Switch) IsDialing(addr *NetAddress) bool {
284         return sw.dialing.Has(addr.IP.String())
285 }
286
287 // IsListening returns true if the switch has at least one listener.
288 // NOTE: Not goroutine safe.
289 func (sw *Switch) IsListening() bool {
290         return len(sw.listeners) > 0
291 }
292
293 // loadBannedPeers load banned peers from db
294 func (sw *Switch) loadBannedPeers() error {
295         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
296                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
297                         return err
298                 }
299         }
300
301         return nil
302 }
303
304 // Listeners returns the list of listeners the switch listens on.
305 // NOTE: Not goroutine safe.
306 func (sw *Switch) Listeners() []Listener {
307         return sw.listeners
308 }
309
310 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
311 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
312         peers := sw.peers.List()
313         for _, peer := range peers {
314                 if peer.outbound && !peer.isLAN {
315                         outbound++
316                 } else {
317                         inbound++
318                 }
319                 if peer.isLAN {
320                         lan++
321                 }
322         }
323         dialing = sw.dialing.Size()
324         return
325 }
326
327 //Peers return switch peerset
328 func (sw *Switch) Peers() *PeerSet {
329         return sw.peers
330 }
331
332 // StopPeerForError disconnects from a peer due to external error.
333 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
334         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
335         sw.stopAndRemovePeer(peer, reason)
336 }
337
338 // StopPeerGracefully disconnect from a peer gracefully.
339 func (sw *Switch) StopPeerGracefully(peerID string) {
340         if peer := sw.peers.Get(peerID); peer != nil {
341                 sw.stopAndRemovePeer(peer, nil)
342         }
343 }
344
345 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
346         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
347         if err != nil {
348                 if err := conn.Close(); err != nil {
349                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
350                 }
351                 return err
352         }
353
354         if err = sw.AddPeer(peerConn, false); err != nil {
355                 if err := conn.Close(); err != nil {
356                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
357                 }
358                 return err
359         }
360
361         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
362         return nil
363 }
364
365 func (sw *Switch) checkBannedPeer(peer string) error {
366         sw.mtx.Lock()
367         defer sw.mtx.Unlock()
368
369         if banEnd, ok := sw.bannedPeer[peer]; ok {
370                 if time.Now().Before(banEnd) {
371                         return ErrConnectBannedPeer
372                 }
373
374                 if err := sw.delBannedPeer(peer); err != nil {
375                         return err
376                 }
377         }
378         return nil
379 }
380
381 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
382         lanPeers, _, _, numDialing := sw.NumPeers()
383         numToDial := maxNumLANPeers - lanPeers
384         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
385         if numToDial <= 0 {
386                 return
387         }
388         addresses := make([]*NetAddress, 0)
389         for i := 0; i < len(lanPeer.IP); i++ {
390                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
391         }
392         sw.dialPeers(addresses)
393 }
394
395 func (sw *Switch) connectLANPeersRoutine() {
396         if !sw.Config.P2P.LANDiscover {
397                 return
398         }
399
400         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
401         if err != nil {
402                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
403                 return
404         }
405
406         for {
407                 select {
408                 case obj, ok := <-lanPeerEventSub.Chan():
409                         if !ok {
410                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
411                                 return
412                         }
413                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
414                         if !ok {
415                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
416                                 continue
417                         }
418                         sw.connectLANPeers(LANPeer)
419                 case <-sw.Quit:
420                         return
421                 }
422         }
423 }
424
425 func (sw *Switch) delBannedPeer(addr string) error {
426         sw.mtx.Lock()
427         defer sw.mtx.Unlock()
428
429         delete(sw.bannedPeer, addr)
430         datajson, err := json.Marshal(sw.bannedPeer)
431         if err != nil {
432                 return err
433         }
434
435         sw.db.Set([]byte(bannedPeerKey), datajson)
436         return nil
437 }
438
439 func (sw *Switch) filterConnByIP(ip string) error {
440         if ip == sw.nodeInfo.listenHost() {
441                 return ErrConnectSelf
442         }
443         return sw.checkBannedPeer(ip)
444 }
445
446 func (sw *Switch) filterConnByPeer(peer *Peer) error {
447         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
448                 return err
449         }
450
451         if sw.nodeInfo.PubKey == peer.PubKey() {
452                 return ErrConnectSelf
453         }
454
455         if sw.peers.Has(peer.Key) {
456                 return ErrDuplicatePeer
457         }
458         return nil
459 }
460
461 func (sw *Switch) listenerRoutine(l Listener) {
462         for {
463                 inConn, ok := <-l.Connections()
464                 if !ok {
465                         break
466                 }
467
468                 // disconnect if we alrady have MaxNumPeers
469                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
470                         if err := inConn.Close(); err != nil {
471                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
472                         }
473                         log.Info("Ignoring inbound connection: already have enough peers.")
474                         continue
475                 }
476
477                 // New inbound connection!
478                 if err := sw.addPeerWithConnection(inConn); err != nil {
479                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
480                         continue
481                 }
482         }
483 }
484
485 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
486         if err := sw.DialPeerWithAddress(a); err != nil {
487                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
488         }
489         wg.Done()
490 }
491
492 func (sw *Switch) dialPeers(addresses []*NetAddress) {
493         connectedPeers := make(map[string]struct{})
494         for _, peer := range sw.Peers().List() {
495                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
496         }
497
498         var wg sync.WaitGroup
499         for _, address := range addresses {
500                 if sw.nodeInfo.ListenAddr == address.String() {
501                         continue
502                 }
503                 if dialling := sw.IsDialing(address); dialling {
504                         continue
505                 }
506                 if _, ok := connectedPeers[address.IP.String()]; ok {
507                         continue
508                 }
509
510                 wg.Add(1)
511                 go sw.dialPeerWorker(address, &wg)
512         }
513         wg.Wait()
514 }
515
516 func (sw *Switch) ensureKeepConnectPeers() {
517         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
518         addresses := make([]*NetAddress, 0)
519         for _, keepDial := range keepDials {
520                 address, err := NewNetAddressString(keepDial)
521                 if err != nil {
522                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
523                         continue
524                 }
525                 addresses = append(addresses, address)
526         }
527
528         sw.dialPeers(addresses)
529 }
530
531 func (sw *Switch) ensureOutboundPeers() {
532         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
533         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
534         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
535         if numToDial <= 0 {
536                 return
537         }
538
539         nodes := make([]*dht.Node, numToDial)
540         n := sw.discv.ReadRandomNodes(nodes)
541         addresses := make([]*NetAddress, 0)
542         for i := 0; i < n; i++ {
543                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
544                 addresses = append(addresses, address)
545         }
546         sw.dialPeers(addresses)
547 }
548
549 func (sw *Switch) ensureOutboundPeersRoutine() {
550         sw.ensureKeepConnectPeers()
551         sw.ensureOutboundPeers()
552
553         ticker := time.NewTicker(10 * time.Second)
554         defer ticker.Stop()
555
556         for {
557                 select {
558                 case <-ticker.C:
559                         sw.ensureKeepConnectPeers()
560                         sw.ensureOutboundPeers()
561                 case <-sw.Quit:
562                         return
563                 }
564         }
565 }
566
567 func (sw *Switch) startInitPeer(peer *Peer) error {
568         // spawn send/recv routines
569         if _, err := peer.Start(); err != nil {
570                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
571         }
572
573         for _, reactor := range sw.reactors {
574                 if err := reactor.AddPeer(peer); err != nil {
575                         return err
576                 }
577         }
578         return nil
579 }
580
581 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
582         sw.peers.Remove(peer)
583         for _, reactor := range sw.reactors {
584                 reactor.RemovePeer(peer, reason)
585         }
586         peer.Stop()
587
588         sentStatus, receivedStatus := peer.TrafficStatus()
589         log.WithFields(log.Fields{
590                 "module":                logModule,
591                 "address":               peer.Addr().String(),
592                 "reason":                reason,
593                 "duration":              sentStatus.Duration.String(),
594                 "total_sent":            sentStatus.Bytes,
595                 "total_received":        receivedStatus.Bytes,
596                 "average_sent_rate":     sentStatus.AvgRate,
597                 "average_received_rate": receivedStatus.AvgRate,
598                 "peer num":              sw.peers.Size(),
599         }).Info("disconnect with peer")
600 }