OSDN Git Service

new repo
[bytom/vapor.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/vapor/config"
16         "github.com/vapor/consensus"
17         "github.com/vapor/errors"
18         "github.com/vapor/p2p/connection"
19         "github.com/vapor/p2p/discover"
20         "github.com/vapor/p2p/trust"
21         "github.com/vapor/version"
22 )
23
24 const (
25         bannedPeerKey       = "BannedPeer"
26         defaultBanDuration  = time.Hour * 1
27         minNumOutboundPeers = 3
28 )
29
30 //pre-define errors for connecting fail
31 var (
32         ErrDuplicatePeer     = errors.New("Duplicate peer")
33         ErrConnectSelf       = errors.New("Connect self")
34         ErrConnectBannedPeer = errors.New("Connect banned peer")
35         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
36 )
37
38 // Switch handles peer connections and exposes an API to receive incoming messages
39 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
40 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
41 // incoming messages are received on the reactor.
42 type Switch struct {
43         cmn.BaseService
44
45         Config       *cfg.Config
46         peerConfig   *PeerConfig
47         listeners    []Listener
48         reactors     map[string]Reactor
49         chDescs      []*connection.ChannelDescriptor
50         reactorsByCh map[byte]Reactor
51         peers        *PeerSet
52         dialing      *cmn.CMap
53         nodeInfo     *NodeInfo             // our node info
54         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
55         discv        *discover.Network
56         bannedPeer   map[string]time.Time
57         db           dbm.DB
58         mtx          sync.Mutex
59 }
60
61 // NewSwitch creates a new Switch with the given config.
62 func NewSwitch(config *cfg.Config) *Switch {
63         sw := &Switch{
64                 Config:       config,
65                 peerConfig:   DefaultPeerConfig(config.P2P),
66                 reactors:     make(map[string]Reactor),
67                 chDescs:      make([]*connection.ChannelDescriptor, 0),
68                 reactorsByCh: make(map[byte]Reactor),
69                 peers:        NewPeerSet(),
70                 dialing:      cmn.NewCMap(),
71                 nodeInfo:     nil,
72                 db:           dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
73         }
74         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
75         sw.bannedPeer = make(map[string]time.Time)
76         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
77                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
78                         return nil
79                 }
80         }
81         trust.Init()
82         return sw
83 }
84
85 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
86 func (sw *Switch) OnStart() error {
87         for _, reactor := range sw.reactors {
88                 if _, err := reactor.Start(); err != nil {
89                         return err
90                 }
91         }
92         for _, listener := range sw.listeners {
93                 go sw.listenerRoutine(listener)
94         }
95         go sw.ensureOutboundPeersRoutine()
96         return nil
97 }
98
99 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
100 func (sw *Switch) OnStop() {
101         for _, listener := range sw.listeners {
102                 listener.Stop()
103         }
104         sw.listeners = nil
105
106         for _, peer := range sw.peers.List() {
107                 peer.Stop()
108                 sw.peers.Remove(peer)
109         }
110
111         for _, reactor := range sw.reactors {
112                 reactor.Stop()
113         }
114 }
115
116 //AddBannedPeer add peer to blacklist
117 func (sw *Switch) AddBannedPeer(ip string) error {
118         sw.mtx.Lock()
119         defer sw.mtx.Unlock()
120
121         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
122         datajson, err := json.Marshal(sw.bannedPeer)
123         if err != nil {
124                 return err
125         }
126
127         sw.db.Set([]byte(bannedPeerKey), datajson)
128         return nil
129 }
130
131 // AddPeer performs the P2P handshake with a peer
132 // that already has a SecretConnection. If all goes well,
133 // it starts the peer and adds it to the switch.
134 // NOTE: This performs a blocking handshake before the peer is added.
135 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
136 func (sw *Switch) AddPeer(pc *peerConn) error {
137         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout))
138         if err != nil {
139                 return err
140         }
141
142         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
143                 return err
144         }
145         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
146                 return err
147         }
148
149         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
150         if err := sw.filterConnByPeer(peer); err != nil {
151                 return err
152         }
153
154         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
155                 return ErrConnectSpvPeer
156         }
157
158         // Start peer
159         if sw.IsRunning() {
160                 if err := sw.startInitPeer(peer); err != nil {
161                         return err
162                 }
163         }
164         return sw.peers.Add(peer)
165 }
166
167 // AddReactor adds the given reactor to the switch.
168 // NOTE: Not goroutine safe.
169 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
170         // Validate the reactor.
171         // No two reactors can share the same channel.
172         for _, chDesc := range reactor.GetChannels() {
173                 chID := chDesc.ID
174                 if sw.reactorsByCh[chID] != nil {
175                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
176                 }
177                 sw.chDescs = append(sw.chDescs, chDesc)
178                 sw.reactorsByCh[chID] = reactor
179         }
180         sw.reactors[name] = reactor
181         reactor.SetSwitch(sw)
182         return reactor
183 }
184
185 // AddListener adds the given listener to the switch for listening to incoming peer connections.
186 // NOTE: Not goroutine safe.
187 func (sw *Switch) AddListener(l Listener) {
188         sw.listeners = append(sw.listeners, l)
189 }
190
191 //DialPeerWithAddress dial node from net address
192 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
193         log.Debug("Dialing peer address:", addr)
194         sw.dialing.Set(addr.IP.String(), addr)
195         defer sw.dialing.Delete(addr.IP.String())
196         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
197                 return err
198         }
199
200         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
201         if err != nil {
202                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
203                 return err
204         }
205
206         if err = sw.AddPeer(pc); err != nil {
207                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
208                 pc.CloseConn()
209                 return err
210         }
211         log.Debug("DialPeer added peer:", addr)
212         return nil
213 }
214
215 //IsDialing prevent duplicate dialing
216 func (sw *Switch) IsDialing(addr *NetAddress) bool {
217         return sw.dialing.Has(addr.IP.String())
218 }
219
220 // IsListening returns true if the switch has at least one listener.
221 // NOTE: Not goroutine safe.
222 func (sw *Switch) IsListening() bool {
223         return len(sw.listeners) > 0
224 }
225
226 // Listeners returns the list of listeners the switch listens on.
227 // NOTE: Not goroutine safe.
228 func (sw *Switch) Listeners() []Listener {
229         return sw.listeners
230 }
231
232 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
233 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
234         peers := sw.peers.List()
235         for _, peer := range peers {
236                 if peer.outbound {
237                         outbound++
238                 } else {
239                         inbound++
240                 }
241         }
242         dialing = sw.dialing.Size()
243         return
244 }
245
246 // NodeInfo returns the switch's NodeInfo.
247 // NOTE: Not goroutine safe.
248 func (sw *Switch) NodeInfo() *NodeInfo {
249         return sw.nodeInfo
250 }
251
252 //Peers return switch peerset
253 func (sw *Switch) Peers() *PeerSet {
254         return sw.peers
255 }
256
257 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
258 // NOTE: Not goroutine safe.
259 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
260         sw.nodeInfo = nodeInfo
261 }
262
263 // SetNodePrivKey sets the switch's private key for authenticated encryption.
264 // NOTE: Not goroutine safe.
265 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
266         sw.nodePrivKey = nodePrivKey
267         if sw.nodeInfo != nil {
268                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
269         }
270 }
271
272 // StopPeerForError disconnects from a peer due to external error.
273 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
274         log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
275         sw.stopAndRemovePeer(peer, reason)
276 }
277
278 // StopPeerGracefully disconnect from a peer gracefully.
279 func (sw *Switch) StopPeerGracefully(peerID string) {
280         if peer := sw.peers.Get(peerID); peer != nil {
281                 sw.stopAndRemovePeer(peer, nil)
282         }
283 }
284
285 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
286         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
287         if err != nil {
288                 conn.Close()
289                 return err
290         }
291
292         if err = sw.AddPeer(peerConn); err != nil {
293                 conn.Close()
294                 return err
295         }
296         return nil
297 }
298
299 func (sw *Switch) checkBannedPeer(peer string) error {
300         sw.mtx.Lock()
301         defer sw.mtx.Unlock()
302
303         if banEnd, ok := sw.bannedPeer[peer]; ok {
304                 if time.Now().Before(banEnd) {
305                         return ErrConnectBannedPeer
306                 }
307                 sw.delBannedPeer(peer)
308         }
309         return nil
310 }
311
312 func (sw *Switch) delBannedPeer(addr string) error {
313         sw.mtx.Lock()
314         defer sw.mtx.Unlock()
315
316         delete(sw.bannedPeer, addr)
317         datajson, err := json.Marshal(sw.bannedPeer)
318         if err != nil {
319                 return err
320         }
321
322         sw.db.Set([]byte(bannedPeerKey), datajson)
323         return nil
324 }
325
326 func (sw *Switch) filterConnByIP(ip string) error {
327         if ip == sw.nodeInfo.ListenHost() {
328                 return ErrConnectSelf
329         }
330         return sw.checkBannedPeer(ip)
331 }
332
333 func (sw *Switch) filterConnByPeer(peer *Peer) error {
334         if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
335                 return err
336         }
337
338         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
339                 return ErrConnectSelf
340         }
341
342         if sw.peers.Has(peer.Key) {
343                 return ErrDuplicatePeer
344         }
345         return nil
346 }
347
348 func (sw *Switch) listenerRoutine(l Listener) {
349         for {
350                 inConn, ok := <-l.Connections()
351                 if !ok {
352                         break
353                 }
354
355                 // disconnect if we alrady have MaxNumPeers
356                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
357                         inConn.Close()
358                         log.Info("Ignoring inbound connection: already have enough peers.")
359                         continue
360                 }
361
362                 // New inbound connection!
363                 if err := sw.addPeerWithConnection(inConn); err != nil {
364                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
365                         continue
366                 }
367         }
368 }
369
370 // SetDiscv connect the discv model to the switch
371 func (sw *Switch) SetDiscv(discv *discover.Network) {
372         sw.discv = discv
373 }
374
375 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
376         if err := sw.DialPeerWithAddress(a); err != nil {
377                 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
378         }
379         wg.Done()
380 }
381
382 func (sw *Switch) ensureOutboundPeers() {
383         numOutPeers, _, numDialing := sw.NumPeers()
384         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
385         log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
386         if numToDial <= 0 {
387                 return
388         }
389
390         connectedPeers := make(map[string]struct{})
391         for _, peer := range sw.Peers().List() {
392                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
393         }
394
395         var wg sync.WaitGroup
396         nodes := make([]*discover.Node, numToDial)
397         n := sw.discv.ReadRandomNodes(nodes)
398         for i := 0; i < n; i++ {
399                 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
400                 if sw.NodeInfo().ListenAddr == try.String() {
401                         continue
402                 }
403                 if dialling := sw.IsDialing(try); dialling {
404                         continue
405                 }
406                 if _, ok := connectedPeers[try.IP.String()]; ok {
407                         continue
408                 }
409
410                 wg.Add(1)
411                 go sw.dialPeerWorker(try, &wg)
412         }
413         wg.Wait()
414 }
415
416 func (sw *Switch) ensureOutboundPeersRoutine() {
417         sw.ensureOutboundPeers()
418
419         ticker := time.NewTicker(10 * time.Second)
420         defer ticker.Stop()
421
422         for {
423                 select {
424                 case <-ticker.C:
425                         sw.ensureOutboundPeers()
426                 case <-sw.Quit:
427                         return
428                 }
429         }
430 }
431
432 func (sw *Switch) startInitPeer(peer *Peer) error {
433         peer.Start() // spawn send/recv routines
434         for _, reactor := range sw.reactors {
435                 if err := reactor.AddPeer(peer); err != nil {
436                         return err
437                 }
438         }
439         return nil
440 }
441
442 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
443         sw.peers.Remove(peer)
444         for _, reactor := range sw.reactors {
445                 reactor.RemovePeer(peer, reason)
446         }
447         peer.Stop()
448
449         sentStatus, receivedStatus := peer.TrafficStatus()
450         log.WithFields(log.Fields{
451                 "address":               peer.Addr().String(),
452                 "reason":                reason,
453                 "duration":              sentStatus.Duration.String(),
454                 "total_sent":            sentStatus.Bytes,
455                 "total_received":        receivedStatus.Bytes,
456                 "average_sent_rate":     sentStatus.AvgRate,
457                 "average_received_rate": receivedStatus.AvgRate,
458         }).Info("disconnect with peer")
459 }