OSDN Git Service

add vaildate-block
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/hex"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         cfg "github.com/bytom/bytom/config"
15         "github.com/bytom/bytom/consensus"
16         "github.com/bytom/bytom/crypto/ed25519"
17         "github.com/bytom/bytom/errors"
18         "github.com/bytom/bytom/event"
19         "github.com/bytom/bytom/p2p/connection"
20         "github.com/bytom/bytom/p2p/discover/dht"
21         "github.com/bytom/bytom/p2p/discover/mdns"
22         "github.com/bytom/bytom/p2p/netutil"
23         "github.com/bytom/bytom/p2p/security"
24         "github.com/bytom/bytom/version"
25 )
26
27 const (
28         logModule = "p2p"
29
30         minNumOutboundPeers = 4
31         maxNumLANPeers      = 5
32 )
33
34 //pre-define errors for connecting fail
35 var (
36         ErrDuplicatePeer  = errors.New("Duplicate peer")
37         ErrConnectSelf    = errors.New("Connect self")
38         ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
39 )
40
41 type discv interface {
42         ReadRandomNodes(buf []*dht.Node) (n int)
43 }
44
45 type lanDiscv interface {
46         Subscribe() (*event.Subscription, error)
47         Stop()
48 }
49
50 type Security interface {
51         DoFilter(ip string, pubKey string) error
52         IsBanned(ip string, level byte, reason string) bool
53         RegisterFilter(filter security.Filter)
54         Start() error
55 }
56
57 // Switch handles peer connections and exposes an API to receive incoming messages
58 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
59 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
60 // incoming messages are received on the reactor.
61 type Switch struct {
62         cmn.BaseService
63
64         Config       *cfg.Config
65         peerConfig   *PeerConfig
66         listeners    []Listener
67         reactors     map[string]Reactor
68         chDescs      []*connection.ChannelDescriptor
69         reactorsByCh map[byte]Reactor
70         peers        *PeerSet
71         dialing      *cmn.CMap
72         nodeInfo     *NodeInfo             // our node info
73         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
74         discv        discv
75         lanDiscv     lanDiscv
76         security     Security
77 }
78
79 // NewSwitch create a new Switch and set discover.
80 func NewSwitch(config *cfg.Config) (*Switch, error) {
81         var err error
82         var l Listener
83         var listenAddr string
84         var discv *dht.Network
85         var lanDiscv *mdns.LANDiscover
86
87         config.P2P.PrivateKey, err = config.NodeKey()
88         if err != nil {
89                 return nil, err
90         }
91
92         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
93         if err != nil {
94                 return nil, err
95         }
96
97         var newKey [64]byte
98         copy(newKey[:], bytes)
99         privKey := crypto.PrivKeyEd25519(newKey)
100         if !config.VaultMode {
101                 // Create listener
102                 l, listenAddr = GetListener(config.P2P)
103                 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
104                 if err != nil {
105                         return nil, err
106                 }
107                 if config.P2P.LANDiscover {
108                         lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
109                 }
110         }
111
112         return newSwitch(config, discv, lanDiscv, l, privKey, listenAddr)
113 }
114
115 // newSwitch creates a new Switch with the given config.
116 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
117         sw := &Switch{
118                 Config:       config,
119                 peerConfig:   DefaultPeerConfig(config.P2P),
120                 reactors:     make(map[string]Reactor),
121                 chDescs:      make([]*connection.ChannelDescriptor, 0),
122                 reactorsByCh: make(map[byte]Reactor),
123                 peers:        NewPeerSet(),
124                 dialing:      cmn.NewCMap(),
125                 nodePrivKey:  priv,
126                 discv:        discv,
127                 lanDiscv:     lanDiscv,
128                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
129                 security:     security.NewSecurity(config),
130         }
131
132         sw.AddListener(l)
133         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
134         return sw, nil
135 }
136
137 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
138 func (sw *Switch) OnStart() error {
139         for _, reactor := range sw.reactors {
140                 if _, err := reactor.Start(); err != nil {
141                         return err
142                 }
143         }
144
145         sw.security.RegisterFilter(sw.nodeInfo)
146         sw.security.RegisterFilter(sw.peers)
147         if err := sw.security.Start(); err != nil {
148                 return err
149         }
150
151         for _, listener := range sw.listeners {
152                 go sw.listenerRoutine(listener)
153         }
154         go sw.ensureOutboundPeersRoutine()
155         go sw.connectLANPeersRoutine()
156
157         return nil
158 }
159
160 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
161 func (sw *Switch) OnStop() {
162         if sw.Config.P2P.LANDiscover {
163                 sw.lanDiscv.Stop()
164         }
165
166         for _, listener := range sw.listeners {
167                 listener.Stop()
168         }
169         sw.listeners = nil
170
171         for _, peer := range sw.peers.List() {
172                 peer.Stop()
173                 sw.peers.Remove(peer)
174         }
175
176         for _, reactor := range sw.reactors {
177                 reactor.Stop()
178         }
179 }
180
181 // AddPeer performs the P2P handshake with a peer
182 // that already has a SecretConnection. If all goes well,
183 // it starts the peer and adds it to the switch.
184 // NOTE: This performs a blocking handshake before the peer is added.
185 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
186 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
187         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
188         if err != nil {
189                 return err
190         }
191
192         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
193                 return err
194         }
195         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
196                 return err
197         }
198
199         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
200         if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey().String()); err != nil {
201                 return err
202         }
203
204         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
205                 return ErrConnectSpvPeer
206         }
207
208         // Start peer
209         if sw.IsRunning() {
210                 if err := sw.startInitPeer(peer); err != nil {
211                         return err
212                 }
213         }
214
215         return sw.peers.Add(peer)
216 }
217
218 // AddReactor adds the given reactor to the switch.
219 // NOTE: Not goroutine safe.
220 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
221         // Validate the reactor.
222         // No two reactors can share the same channel.
223         for _, chDesc := range reactor.GetChannels() {
224                 chID := chDesc.ID
225                 if sw.reactorsByCh[chID] != nil {
226                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
227                 }
228                 sw.chDescs = append(sw.chDescs, chDesc)
229                 sw.reactorsByCh[chID] = reactor
230         }
231         sw.reactors[name] = reactor
232         reactor.SetSwitch(sw)
233         return reactor
234 }
235
236 // AddListener adds the given listener to the switch for listening to incoming peer connections.
237 // NOTE: Not goroutine safe.
238 func (sw *Switch) AddListener(l Listener) {
239         sw.listeners = append(sw.listeners, l)
240 }
241
242 //DialPeerWithAddress dial node from net address
243 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
244         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
245         sw.dialing.Set(addr.IP.String(), addr)
246         defer sw.dialing.Delete(addr.IP.String())
247         if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
248                 return err
249         }
250
251         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
252         if err != nil {
253                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
254                 return err
255         }
256
257         if err = sw.AddPeer(pc, addr.isLAN); err != nil {
258                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on switch AddPeer")
259                 pc.CloseConn()
260                 return err
261         }
262         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
263         return nil
264 }
265
266 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
267         return sw.security.IsBanned(ip, level, reason)
268 }
269
270 //IsDialing prevent duplicate dialing
271 func (sw *Switch) IsDialing(addr *NetAddress) bool {
272         return sw.dialing.Has(addr.IP.String())
273 }
274
275 // IsListening returns true if the switch has at least one listener.
276 // NOTE: Not goroutine safe.
277 func (sw *Switch) IsListening() bool {
278         return len(sw.listeners) > 0
279 }
280
281 // Listeners returns the list of listeners the switch listens on.
282 // NOTE: Not goroutine safe.
283 func (sw *Switch) Listeners() []Listener {
284         return sw.listeners
285 }
286
287 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
288 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
289         peers := sw.peers.List()
290         for _, peer := range peers {
291                 if peer.outbound && !peer.isLAN {
292                         outbound++
293                 } else {
294                         inbound++
295                 }
296                 if peer.isLAN {
297                         lan++
298                 }
299         }
300         dialing = sw.dialing.Size()
301         return
302 }
303
304 // NodeInfo returns the switch's NodeInfo.
305 // NOTE: Not goroutine safe.
306 func (sw *Switch) NodeInfo() *NodeInfo {
307         return sw.nodeInfo
308 }
309
310 //Peers return switch peerset
311 func (sw *Switch) Peers() *PeerSet {
312         return sw.peers
313 }
314
315 // StopPeerForError disconnects from a peer due to external error.
316 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
317         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
318         sw.stopAndRemovePeer(peer, reason)
319 }
320
321 // StopPeerGracefully disconnect from a peer gracefully.
322 func (sw *Switch) StopPeerGracefully(peerID string) {
323         if peer := sw.peers.Get(peerID); peer != nil {
324                 sw.stopAndRemovePeer(peer, nil)
325         }
326 }
327
328 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
329         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
330         if err != nil {
331                 if err := conn.Close(); err != nil {
332                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
333                 }
334                 return err
335         }
336
337         if err = sw.AddPeer(peerConn, false); err != nil {
338                 if err := conn.Close(); err != nil {
339                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
340                 }
341                 return err
342         }
343
344         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
345         return nil
346 }
347
348 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
349         lanPeers, _, _, numDialing := sw.NumPeers()
350         numToDial := maxNumLANPeers - lanPeers
351         log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
352         if numToDial <= 0 {
353                 return
354         }
355         addresses := make([]*NetAddress, 0)
356         for i := 0; i < len(lanPeer.IP); i++ {
357                 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
358         }
359         sw.dialPeers(addresses)
360 }
361
362 func (sw *Switch) connectLANPeersRoutine() {
363         if !sw.Config.P2P.LANDiscover {
364                 return
365         }
366
367         lanPeerEventSub, err := sw.lanDiscv.Subscribe()
368         if err != nil {
369                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
370                 return
371         }
372
373         for {
374                 select {
375                 case obj, ok := <-lanPeerEventSub.Chan():
376                         if !ok {
377                                 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
378                                 return
379                         }
380                         LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
381                         if !ok {
382                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
383                                 continue
384                         }
385                         sw.connectLANPeers(LANPeer)
386                 case <-sw.Quit:
387                         return
388                 }
389         }
390 }
391
392 func (sw *Switch) listenerRoutine(l Listener) {
393         for {
394                 inConn, ok := <-l.Connections()
395                 if !ok {
396                         break
397                 }
398
399                 // disconnect if we alrady have MaxNumPeers
400                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
401                         if err := inConn.Close(); err != nil {
402                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
403                         }
404                         log.Info("Ignoring inbound connection: already have enough peers.")
405                         continue
406                 }
407
408                 // New inbound connection!
409                 if err := sw.addPeerWithConnection(inConn); err != nil {
410                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
411                         continue
412                 }
413         }
414 }
415
416 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
417         if err := sw.DialPeerWithAddress(a); err != nil {
418                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
419         }
420         wg.Done()
421 }
422
423 func (sw *Switch) dialPeers(addresses []*NetAddress) {
424         connectedPeers := make(map[string]struct{})
425         for _, peer := range sw.Peers().List() {
426                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
427         }
428
429         var wg sync.WaitGroup
430         for _, address := range addresses {
431                 if sw.NodeInfo().ListenAddr == address.String() {
432                         continue
433                 }
434                 if dialling := sw.IsDialing(address); dialling {
435                         continue
436                 }
437                 if _, ok := connectedPeers[address.IP.String()]; ok {
438                         continue
439                 }
440
441                 wg.Add(1)
442                 go sw.dialPeerWorker(address, &wg)
443         }
444         wg.Wait()
445 }
446
447 func (sw *Switch) ensureKeepConnectPeers() {
448         keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
449         addresses := make([]*NetAddress, 0)
450         for _, keepDial := range keepDials {
451                 address, err := NewNetAddressString(keepDial)
452                 if err != nil {
453                         log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
454                         continue
455                 }
456                 addresses = append(addresses, address)
457         }
458
459         sw.dialPeers(addresses)
460 }
461
462 func (sw *Switch) ensureOutboundPeers() {
463         lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
464         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
465         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
466         if numToDial <= 0 {
467                 return
468         }
469
470         nodes := make([]*dht.Node, numToDial)
471         n := sw.discv.ReadRandomNodes(nodes)
472         addresses := make([]*NetAddress, 0)
473         for i := 0; i < n; i++ {
474                 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
475                 addresses = append(addresses, address)
476         }
477         sw.dialPeers(addresses)
478 }
479
480 func (sw *Switch) ensureOutboundPeersRoutine() {
481         sw.ensureKeepConnectPeers()
482         sw.ensureOutboundPeers()
483
484         ticker := time.NewTicker(10 * time.Second)
485         defer ticker.Stop()
486
487         for {
488                 select {
489                 case <-ticker.C:
490                         sw.ensureKeepConnectPeers()
491                         sw.ensureOutboundPeers()
492                 case <-sw.Quit:
493                         return
494                 }
495         }
496 }
497
498 func (sw *Switch) startInitPeer(peer *Peer) error {
499         // spawn send/recv routines
500         if _, err := peer.Start(); err != nil {
501                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
502         }
503
504         for _, reactor := range sw.reactors {
505                 if err := reactor.AddPeer(peer); err != nil {
506                         return err
507                 }
508         }
509         return nil
510 }
511
512 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
513         sw.peers.Remove(peer)
514         for _, reactor := range sw.reactors {
515                 reactor.RemovePeer(peer, reason)
516         }
517         peer.Stop()
518
519         sentStatus, receivedStatus := peer.TrafficStatus()
520         log.WithFields(log.Fields{
521                 "module":                logModule,
522                 "address":               peer.Addr().String(),
523                 "reason":                reason,
524                 "duration":              sentStatus.Duration.String(),
525                 "total_sent":            sentStatus.Bytes,
526                 "total_received":        receivedStatus.Bytes,
527                 "average_sent_rate":     sentStatus.AvgRate,
528                 "average_received_rate": receivedStatus.AvgRate,
529                 "peer num":              sw.peers.Size(),
530         }).Info("disconnect with peer")
531 }