OSDN Git Service

Remove p2p peer exchange module (#1196)
[bytom/bytom-spv.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/p2p/connection"
18         "github.com/bytom/p2p/discover"
19         "github.com/bytom/p2p/trust"
20 )
21
22 const (
23         bannedPeerKey       = "BannedPeer"
24         defaultBanDuration  = time.Hour * 1
25         minNumOutboundPeers = 5
26 )
27
28 //pre-define errors for connecting fail
29 var (
30         ErrDuplicatePeer     = errors.New("Duplicate peer")
31         ErrConnectSelf       = errors.New("Connect self")
32         ErrConnectBannedPeer = errors.New("Connect banned peer")
33 )
34
35 // Switch handles peer connections and exposes an API to receive incoming messages
36 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
37 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
38 // incoming messages are received on the reactor.
39 type Switch struct {
40         cmn.BaseService
41
42         Config       *cfg.Config
43         peerConfig   *PeerConfig
44         listeners    []Listener
45         reactors     map[string]Reactor
46         chDescs      []*connection.ChannelDescriptor
47         reactorsByCh map[byte]Reactor
48         peers        *PeerSet
49         dialing      *cmn.CMap
50         nodeInfo     *NodeInfo             // our node info
51         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
52         discv        *discover.Network
53         bannedPeer   map[string]time.Time
54         db           dbm.DB
55         mtx          sync.Mutex
56 }
57
58 // NewSwitch creates a new Switch with the given config.
59 func NewSwitch(config *cfg.Config) *Switch {
60         sw := &Switch{
61                 Config:       config,
62                 peerConfig:   DefaultPeerConfig(config.P2P),
63                 reactors:     make(map[string]Reactor),
64                 chDescs:      make([]*connection.ChannelDescriptor, 0),
65                 reactorsByCh: make(map[byte]Reactor),
66                 peers:        NewPeerSet(),
67                 dialing:      cmn.NewCMap(),
68                 nodeInfo:     nil,
69                 db:           dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
70         }
71         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
72         sw.bannedPeer = make(map[string]time.Time)
73         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
74                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
75                         return nil
76                 }
77         }
78         trust.Init()
79         return sw
80 }
81
82 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
83 func (sw *Switch) OnStart() error {
84         for _, reactor := range sw.reactors {
85                 if _, err := reactor.Start(); err != nil {
86                         return err
87                 }
88         }
89         for _, listener := range sw.listeners {
90                 go sw.listenerRoutine(listener)
91         }
92         go sw.ensureOutboundPeersRoutine()
93         return nil
94 }
95
96 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
97 func (sw *Switch) OnStop() {
98         for _, listener := range sw.listeners {
99                 listener.Stop()
100         }
101         sw.listeners = nil
102
103         for _, peer := range sw.peers.List() {
104                 peer.Stop()
105                 sw.peers.Remove(peer)
106         }
107
108         for _, reactor := range sw.reactors {
109                 reactor.Stop()
110         }
111 }
112
113 //AddBannedPeer add peer to blacklist
114 func (sw *Switch) AddBannedPeer(ip string) error {
115         sw.mtx.Lock()
116         defer sw.mtx.Unlock()
117
118         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
119         datajson, err := json.Marshal(sw.bannedPeer)
120         if err != nil {
121                 return err
122         }
123
124         sw.db.Set([]byte(bannedPeerKey), datajson)
125         return nil
126 }
127
128 // AddPeer performs the P2P handshake with a peer
129 // that already has a SecretConnection. If all goes well,
130 // it starts the peer and adds it to the switch.
131 // NOTE: This performs a blocking handshake before the peer is added.
132 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
133 func (sw *Switch) AddPeer(pc *peerConn) error {
134         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
135         if err != nil {
136                 return err
137         }
138
139         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
140                 return err
141         }
142
143         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
144         if err := sw.filterConnByPeer(peer); err != nil {
145                 return err
146         }
147
148         // Start peer
149         if sw.IsRunning() {
150                 if err := sw.startInitPeer(peer); err != nil {
151                         return err
152                 }
153         }
154         return sw.peers.Add(peer)
155 }
156
157 // AddReactor adds the given reactor to the switch.
158 // NOTE: Not goroutine safe.
159 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
160         // Validate the reactor.
161         // No two reactors can share the same channel.
162         for _, chDesc := range reactor.GetChannels() {
163                 chID := chDesc.ID
164                 if sw.reactorsByCh[chID] != nil {
165                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
166                 }
167                 sw.chDescs = append(sw.chDescs, chDesc)
168                 sw.reactorsByCh[chID] = reactor
169         }
170         sw.reactors[name] = reactor
171         reactor.SetSwitch(sw)
172         return reactor
173 }
174
175 // AddListener adds the given listener to the switch for listening to incoming peer connections.
176 // NOTE: Not goroutine safe.
177 func (sw *Switch) AddListener(l Listener) {
178         sw.listeners = append(sw.listeners, l)
179 }
180
181 //DialPeerWithAddress dial node from net address
182 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
183         log.Debug("Dialing peer address:", addr)
184         sw.dialing.Set(addr.IP.String(), addr)
185         defer sw.dialing.Delete(addr.IP.String())
186         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
187                 return err
188         }
189
190         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
191         if err != nil {
192                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
193                 return err
194         }
195
196         if err = sw.AddPeer(pc); err != nil {
197                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
198                 pc.CloseConn()
199                 return err
200         }
201         log.Debug("DialPeer added peer:", addr)
202         return nil
203 }
204
205 //IsDialing prevent duplicate dialing
206 func (sw *Switch) IsDialing(addr *NetAddress) bool {
207         return sw.dialing.Has(addr.IP.String())
208 }
209
210 // IsListening returns true if the switch has at least one listener.
211 // NOTE: Not goroutine safe.
212 func (sw *Switch) IsListening() bool {
213         return len(sw.listeners) > 0
214 }
215
216 // Listeners returns the list of listeners the switch listens on.
217 // NOTE: Not goroutine safe.
218 func (sw *Switch) Listeners() []Listener {
219         return sw.listeners
220 }
221
222 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
223 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
224         peers := sw.peers.List()
225         for _, peer := range peers {
226                 if peer.outbound {
227                         outbound++
228                 } else {
229                         inbound++
230                 }
231         }
232         dialing = sw.dialing.Size()
233         return
234 }
235
236 // NodeInfo returns the switch's NodeInfo.
237 // NOTE: Not goroutine safe.
238 func (sw *Switch) NodeInfo() *NodeInfo {
239         return sw.nodeInfo
240 }
241
242 //Peers return switch peerset
243 func (sw *Switch) Peers() *PeerSet {
244         return sw.peers
245 }
246
247 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
248 // NOTE: Not goroutine safe.
249 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
250         sw.nodeInfo = nodeInfo
251 }
252
253 // SetNodePrivKey sets the switch's private key for authenticated encryption.
254 // NOTE: Not goroutine safe.
255 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
256         sw.nodePrivKey = nodePrivKey
257         if sw.nodeInfo != nil {
258                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
259         }
260 }
261
262 // StopPeerForError disconnects from a peer due to external error.
263 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
264         log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
265         sw.stopAndRemovePeer(peer, reason)
266 }
267
268 // StopPeerGracefully disconnect from a peer gracefully.
269 func (sw *Switch) StopPeerGracefully(peerID string) {
270         if peer := sw.peers.Get(peerID); peer != nil {
271                 sw.stopAndRemovePeer(peer, nil)
272         }
273 }
274
275 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
276         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
277         if err != nil {
278                 conn.Close()
279                 return err
280         }
281
282         if err = sw.AddPeer(peerConn); err != nil {
283                 conn.Close()
284                 return err
285         }
286         return nil
287 }
288
289 func (sw *Switch) checkBannedPeer(peer string) error {
290         sw.mtx.Lock()
291         defer sw.mtx.Unlock()
292
293         if banEnd, ok := sw.bannedPeer[peer]; ok {
294                 if time.Now().Before(banEnd) {
295                         return ErrConnectBannedPeer
296                 }
297                 sw.delBannedPeer(peer)
298         }
299         return nil
300 }
301
302 func (sw *Switch) delBannedPeer(addr string) error {
303         sw.mtx.Lock()
304         defer sw.mtx.Unlock()
305
306         delete(sw.bannedPeer, addr)
307         datajson, err := json.Marshal(sw.bannedPeer)
308         if err != nil {
309                 return err
310         }
311
312         sw.db.Set([]byte(bannedPeerKey), datajson)
313         return nil
314 }
315
316 func (sw *Switch) filterConnByIP(ip string) error {
317         if ip == sw.nodeInfo.ListenHost() {
318                 return ErrConnectSelf
319         }
320         return sw.checkBannedPeer(ip)
321 }
322
323 func (sw *Switch) filterConnByPeer(peer *Peer) error {
324         if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
325                 return err
326         }
327
328         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
329                 return ErrConnectSelf
330         }
331
332         if sw.peers.Has(peer.Key) {
333                 return ErrDuplicatePeer
334         }
335         return nil
336 }
337
338 func (sw *Switch) listenerRoutine(l Listener) {
339         for {
340                 inConn, ok := <-l.Connections()
341                 if !ok {
342                         break
343                 }
344
345                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
346                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
347                 // be double of MaxNumPeers
348                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
349                         inConn.Close()
350                         log.Info("Ignoring inbound connection: already have enough peers.")
351                         continue
352                 }
353
354                 // New inbound connection!
355                 if err := sw.addPeerWithConnection(inConn); err != nil {
356                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
357                         continue
358                 }
359         }
360 }
361
362 func (sw *Switch) SetDiscv(discv *discover.Network) {
363         sw.discv = discv
364 }
365
366 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
367         if err := sw.DialPeerWithAddress(a); err != nil {
368                 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
369         }
370         wg.Done()
371 }
372
373 func (sw *Switch) ensureOutboundPeers() {
374         numOutPeers, _, numDialing := sw.NumPeers()
375         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
376         log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
377         if numToDial <= 0 {
378                 return
379         }
380
381         connectedPeers := make(map[string]struct{})
382         for _, peer := range sw.Peers().List() {
383                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
384         }
385
386         var wg sync.WaitGroup
387         nodes := make([]*discover.Node, numToDial)
388         n := sw.discv.ReadRandomNodes(nodes)
389         for i := 0; i < n; i++ {
390                 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
391                 if sw.NodeInfo().ListenAddr == try.String() {
392                         continue
393                 }
394                 if dialling := sw.IsDialing(try); dialling {
395                         continue
396                 }
397                 if _, ok := connectedPeers[try.IP.String()]; ok {
398                         continue
399                 }
400
401                 wg.Add(1)
402                 go sw.dialPeerWorker(try, &wg)
403         }
404         wg.Wait()
405 }
406
407 func (sw *Switch) ensureOutboundPeersRoutine() {
408         sw.ensureOutboundPeers()
409
410         ticker := time.NewTicker(10 * time.Second)
411         defer ticker.Stop()
412
413         for {
414                 select {
415                 case <-ticker.C:
416                         sw.ensureOutboundPeers()
417                 case <-sw.Quit:
418                         return
419                 }
420         }
421 }
422
423 func (sw *Switch) startInitPeer(peer *Peer) error {
424         peer.Start() // spawn send/recv routines
425         for _, reactor := range sw.reactors {
426                 if err := reactor.AddPeer(peer); err != nil {
427                         return err
428                 }
429         }
430         return nil
431 }
432
433 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
434         sw.peers.Remove(peer)
435         for _, reactor := range sw.reactors {
436                 reactor.RemovePeer(peer, reason)
437         }
438         peer.Stop()
439 }