OSDN Git Service

Validate tx unit test (#1587)
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/hex"
5         "encoding/json"
6         "fmt"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14         dbm "github.com/tendermint/tmlibs/db"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/crypto/ed25519"
19         "github.com/bytom/errors"
20         "github.com/bytom/p2p/connection"
21         "github.com/bytom/p2p/discover"
22         "github.com/bytom/p2p/trust"
23         "github.com/bytom/version"
24 )
25
26 const (
27         bannedPeerKey       = "BannedPeer"
28         defaultBanDuration  = time.Hour * 1
29         minNumOutboundPeers = 3
30 )
31
32 //pre-define errors for connecting fail
33 var (
34         ErrDuplicatePeer     = errors.New("Duplicate peer")
35         ErrConnectSelf       = errors.New("Connect self")
36         ErrConnectBannedPeer = errors.New("Connect banned peer")
37         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
38 )
39
40 type discv interface {
41         ReadRandomNodes(buf []*discover.Node) (n int)
42 }
43
44 // Switch handles peer connections and exposes an API to receive incoming messages
45 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
46 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
47 // incoming messages are received on the reactor.
48 type Switch struct {
49         cmn.BaseService
50
51         Config       *cfg.Config
52         peerConfig   *PeerConfig
53         listeners    []Listener
54         reactors     map[string]Reactor
55         chDescs      []*connection.ChannelDescriptor
56         reactorsByCh map[byte]Reactor
57         peers        *PeerSet
58         dialing      *cmn.CMap
59         nodeInfo     *NodeInfo             // our node info
60         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
61         discv        discv
62         bannedPeer   map[string]time.Time
63         db           dbm.DB
64         mtx          sync.Mutex
65 }
66
67 // NewSwitch create a new Switch and set discover.
68 func NewSwitch(config *cfg.Config) (*Switch, error) {
69         var err error
70         var l Listener
71         var listenAddr string
72         var discv *discover.Network
73
74         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
75         config.P2P.PrivateKey, err = config.NodeKey()
76         if err != nil {
77                 return nil, err
78         }
79
80         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
81         if err != nil {
82                 return nil, err
83         }
84
85         var newKey [64]byte
86         copy(newKey[:], bytes)
87         privKey := crypto.PrivKeyEd25519(newKey)
88         if !config.VaultMode {
89                 // Create listener
90                 l, listenAddr = GetListener(config.P2P)
91                 discv, err = discover.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
92                 if err != nil {
93                         return nil, err
94                 }
95         }
96
97         return newSwitch(config, discv, blacklistDB, l, privKey, listenAddr)
98 }
99
100 // newSwitch creates a new Switch with the given config.
101 func newSwitch(config *cfg.Config, discv discv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
102         sw := &Switch{
103                 Config:       config,
104                 peerConfig:   DefaultPeerConfig(config.P2P),
105                 reactors:     make(map[string]Reactor),
106                 chDescs:      make([]*connection.ChannelDescriptor, 0),
107                 reactorsByCh: make(map[byte]Reactor),
108                 peers:        NewPeerSet(),
109                 dialing:      cmn.NewCMap(),
110                 nodePrivKey:  priv,
111                 discv:        discv,
112                 db:           blacklistDB,
113                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
114                 bannedPeer:   make(map[string]time.Time),
115         }
116         if err := sw.loadBannedPeers(); err != nil {
117                 return nil, err
118         }
119
120         sw.AddListener(l)
121         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
122         trust.Init()
123         return sw, nil
124 }
125
126 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
127 func (sw *Switch) OnStart() error {
128         for _, reactor := range sw.reactors {
129                 if _, err := reactor.Start(); err != nil {
130                         return err
131                 }
132         }
133         for _, listener := range sw.listeners {
134                 go sw.listenerRoutine(listener)
135         }
136         go sw.ensureOutboundPeersRoutine()
137         return nil
138 }
139
140 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
141 func (sw *Switch) OnStop() {
142         for _, listener := range sw.listeners {
143                 listener.Stop()
144         }
145         sw.listeners = nil
146
147         for _, peer := range sw.peers.List() {
148                 peer.Stop()
149                 sw.peers.Remove(peer)
150         }
151
152         for _, reactor := range sw.reactors {
153                 reactor.Stop()
154         }
155 }
156
157 //AddBannedPeer add peer to blacklist
158 func (sw *Switch) AddBannedPeer(ip string) error {
159         sw.mtx.Lock()
160         defer sw.mtx.Unlock()
161
162         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
163         dataJSON, err := json.Marshal(sw.bannedPeer)
164         if err != nil {
165                 return err
166         }
167
168         sw.db.Set([]byte(bannedPeerKey), dataJSON)
169         return nil
170 }
171
172 // AddPeer performs the P2P handshake with a peer
173 // that already has a SecretConnection. If all goes well,
174 // it starts the peer and adds it to the switch.
175 // NOTE: This performs a blocking handshake before the peer is added.
176 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
177 func (sw *Switch) AddPeer(pc *peerConn) error {
178         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
179         if err != nil {
180                 return err
181         }
182
183         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
184                 return err
185         }
186         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
187                 return err
188         }
189
190         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
191         if err := sw.filterConnByPeer(peer); err != nil {
192                 return err
193         }
194
195         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
196                 return ErrConnectSpvPeer
197         }
198
199         // Start peer
200         if sw.IsRunning() {
201                 if err := sw.startInitPeer(peer); err != nil {
202                         return err
203                 }
204         }
205
206         return sw.peers.Add(peer)
207 }
208
209 // AddReactor adds the given reactor to the switch.
210 // NOTE: Not goroutine safe.
211 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
212         // Validate the reactor.
213         // No two reactors can share the same channel.
214         for _, chDesc := range reactor.GetChannels() {
215                 chID := chDesc.ID
216                 if sw.reactorsByCh[chID] != nil {
217                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
218                 }
219                 sw.chDescs = append(sw.chDescs, chDesc)
220                 sw.reactorsByCh[chID] = reactor
221         }
222         sw.reactors[name] = reactor
223         reactor.SetSwitch(sw)
224         return reactor
225 }
226
227 // AddListener adds the given listener to the switch for listening to incoming peer connections.
228 // NOTE: Not goroutine safe.
229 func (sw *Switch) AddListener(l Listener) {
230         sw.listeners = append(sw.listeners, l)
231 }
232
233 //DialPeerWithAddress dial node from net address
234 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
235         log.Debug("Dialing peer address:", addr)
236         sw.dialing.Set(addr.IP.String(), addr)
237         defer sw.dialing.Delete(addr.IP.String())
238         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
239                 return err
240         }
241
242         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
243         if err != nil {
244                 log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
245                 return err
246         }
247
248         if err = sw.AddPeer(pc); err != nil {
249                 log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
250                 pc.CloseConn()
251                 return err
252         }
253         log.Debug("DialPeer added peer:", addr)
254         return nil
255 }
256
257 //IsDialing prevent duplicate dialing
258 func (sw *Switch) IsDialing(addr *NetAddress) bool {
259         return sw.dialing.Has(addr.IP.String())
260 }
261
262 // IsListening returns true if the switch has at least one listener.
263 // NOTE: Not goroutine safe.
264 func (sw *Switch) IsListening() bool {
265         return len(sw.listeners) > 0
266 }
267
268 // loadBannedPeers load banned peers from db
269 func (sw *Switch) loadBannedPeers() error {
270         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
271                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
272                         return err
273                 }
274         }
275
276         return nil
277 }
278
279 // Listeners returns the list of listeners the switch listens on.
280 // NOTE: Not goroutine safe.
281 func (sw *Switch) Listeners() []Listener {
282         return sw.listeners
283 }
284
285 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
286 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
287         peers := sw.peers.List()
288         for _, peer := range peers {
289                 if peer.outbound {
290                         outbound++
291                 } else {
292                         inbound++
293                 }
294         }
295         dialing = sw.dialing.Size()
296         return
297 }
298
299 // NodeInfo returns the switch's NodeInfo.
300 // NOTE: Not goroutine safe.
301 func (sw *Switch) NodeInfo() *NodeInfo {
302         return sw.nodeInfo
303 }
304
305 //Peers return switch peerset
306 func (sw *Switch) Peers() *PeerSet {
307         return sw.peers
308 }
309
310 // StopPeerForError disconnects from a peer due to external error.
311 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
312         log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
313         sw.stopAndRemovePeer(peer, reason)
314 }
315
316 // StopPeerGracefully disconnect from a peer gracefully.
317 func (sw *Switch) StopPeerGracefully(peerID string) {
318         if peer := sw.peers.Get(peerID); peer != nil {
319                 sw.stopAndRemovePeer(peer, nil)
320         }
321 }
322
323 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
324         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
325         if err != nil {
326                 if err := conn.Close(); err != nil {
327                         log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
328                 }
329                 return err
330         }
331
332         if err = sw.AddPeer(peerConn); err != nil {
333                 if err := conn.Close(); err != nil {
334                         log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
335                 }
336                 return err
337         }
338
339         return nil
340 }
341
342 func (sw *Switch) checkBannedPeer(peer string) error {
343         sw.mtx.Lock()
344         defer sw.mtx.Unlock()
345
346         if banEnd, ok := sw.bannedPeer[peer]; ok {
347                 if time.Now().Before(banEnd) {
348                         return ErrConnectBannedPeer
349                 }
350
351                 if err := sw.delBannedPeer(peer); err != nil {
352                         return err
353                 }
354         }
355         return nil
356 }
357
358 func (sw *Switch) delBannedPeer(addr string) error {
359         sw.mtx.Lock()
360         defer sw.mtx.Unlock()
361
362         delete(sw.bannedPeer, addr)
363         datajson, err := json.Marshal(sw.bannedPeer)
364         if err != nil {
365                 return err
366         }
367
368         sw.db.Set([]byte(bannedPeerKey), datajson)
369         return nil
370 }
371
372 func (sw *Switch) filterConnByIP(ip string) error {
373         if ip == sw.nodeInfo.listenHost() {
374                 return ErrConnectSelf
375         }
376         return sw.checkBannedPeer(ip)
377 }
378
379 func (sw *Switch) filterConnByPeer(peer *Peer) error {
380         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
381                 return err
382         }
383
384         if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) {
385                 return ErrConnectSelf
386         }
387
388         if sw.peers.Has(peer.Key) {
389                 return ErrDuplicatePeer
390         }
391         return nil
392 }
393
394 func (sw *Switch) listenerRoutine(l Listener) {
395         for {
396                 inConn, ok := <-l.Connections()
397                 if !ok {
398                         break
399                 }
400
401                 // disconnect if we alrady have MaxNumPeers
402                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
403                         if err := inConn.Close(); err != nil {
404                                 log.WithFields(log.Fields{"remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
405                         }
406                         log.Info("Ignoring inbound connection: already have enough peers.")
407                         continue
408                 }
409
410                 // New inbound connection!
411                 if err := sw.addPeerWithConnection(inConn); err != nil {
412                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
413                         continue
414                 }
415         }
416 }
417
418 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
419         if err := sw.DialPeerWithAddress(a); err != nil {
420                 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
421         }
422         wg.Done()
423 }
424
425 func (sw *Switch) ensureOutboundPeers() {
426         numOutPeers, _, numDialing := sw.NumPeers()
427         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
428         log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
429         if numToDial <= 0 {
430                 return
431         }
432
433         connectedPeers := make(map[string]struct{})
434         for _, peer := range sw.Peers().List() {
435                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
436         }
437
438         var wg sync.WaitGroup
439         nodes := make([]*discover.Node, numToDial)
440         n := sw.discv.ReadRandomNodes(nodes)
441         for i := 0; i < n; i++ {
442                 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
443                 if sw.NodeInfo().ListenAddr == try.String() {
444                         continue
445                 }
446                 if dialling := sw.IsDialing(try); dialling {
447                         continue
448                 }
449                 if _, ok := connectedPeers[try.IP.String()]; ok {
450                         continue
451                 }
452
453                 wg.Add(1)
454                 go sw.dialPeerWorker(try, &wg)
455         }
456         wg.Wait()
457 }
458
459 func (sw *Switch) ensureOutboundPeersRoutine() {
460         sw.ensureOutboundPeers()
461
462         ticker := time.NewTicker(10 * time.Second)
463         defer ticker.Stop()
464
465         for {
466                 select {
467                 case <-ticker.C:
468                         sw.ensureOutboundPeers()
469                 case <-sw.Quit:
470                         return
471                 }
472         }
473 }
474
475 func (sw *Switch) startInitPeer(peer *Peer) error {
476         // spawn send/recv routines
477         if _, err := peer.Start(); err != nil {
478                 log.WithFields(log.Fields{"remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
479         }
480
481         for _, reactor := range sw.reactors {
482                 if err := reactor.AddPeer(peer); err != nil {
483                         return err
484                 }
485         }
486         return nil
487 }
488
489 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
490         sw.peers.Remove(peer)
491         for _, reactor := range sw.reactors {
492                 reactor.RemovePeer(peer, reason)
493         }
494         peer.Stop()
495
496         sentStatus, receivedStatus := peer.TrafficStatus()
497         log.WithFields(log.Fields{
498                 "address":               peer.Addr().String(),
499                 "reason":                reason,
500                 "duration":              sentStatus.Duration.String(),
501                 "total_sent":            sentStatus.Bytes,
502                 "total_received":        receivedStatus.Bytes,
503                 "average_sent_rate":     sentStatus.AvgRate,
504                 "average_received_rate": receivedStatus.AvgRate,
505         }).Info("disconnect with peer")
506 }