OSDN Git Service

Change get external ip method (#1649)
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "reflect"
7         "strconv"
8         "time"
9
10         "github.com/btcsuite/go-socks/socks"
11         "github.com/pkg/errors"
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         "github.com/tendermint/go-wire"
15         cmn "github.com/tendermint/tmlibs/common"
16         "github.com/tendermint/tmlibs/flowrate"
17
18         cfg "github.com/bytom/config"
19         "github.com/bytom/consensus"
20         "github.com/bytom/p2p/connection"
21 )
22
23 // peerConn contains the raw connection and its config.
24 type peerConn struct {
25         outbound bool
26         config   *PeerConfig
27         conn     net.Conn // source connection
28 }
29
30 // PeerConfig is a Peer configuration.
31 type PeerConfig struct {
32         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
33         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
34         ProxyAddress     string                  `mapstructure:"proxy_address"`
35         ProxyUsername    string                  `mapstructure:"proxy_username"`
36         ProxyPassword    string                  `mapstructure:"proxy_password"`
37         MConfig          *connection.MConnConfig `mapstructure:"connection"`
38 }
39
40 // DefaultPeerConfig returns the default config.
41 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
42         return &PeerConfig{
43                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
44                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
45                 ProxyAddress:     config.ProxyAddress,
46                 ProxyUsername:    config.ProxyUsername,
47                 ProxyPassword:    config.ProxyPassword,
48                 MConfig:          connection.DefaultMConnConfig(),
49         }
50 }
51
52 // Peer represent a bytom network node
53 type Peer struct {
54         cmn.BaseService
55         *NodeInfo
56         *peerConn
57         mconn *connection.MConnection // multiplex connection
58         Key   string
59 }
60
61 // OnStart implements BaseService.
62 func (p *Peer) OnStart() error {
63         p.BaseService.OnStart()
64         _, err := p.mconn.Start()
65         return err
66 }
67
68 // OnStop implements BaseService.
69 func (p *Peer) OnStop() {
70         p.BaseService.OnStop()
71         p.mconn.Stop()
72 }
73
74 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
75         // Key and NodeInfo are set after Handshake
76         p := &Peer{
77                 peerConn: pc,
78                 NodeInfo: nodeInfo,
79                 Key:      nodeInfo.PubKey.KeyString(),
80         }
81         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
82         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
83         return p
84 }
85
86 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
87         conn, err := dial(addr, config)
88         if err != nil {
89                 return nil, errors.Wrap(err, "Error dial peer")
90         }
91
92         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
93         if err != nil {
94                 conn.Close()
95                 return nil, err
96         }
97         return pc, nil
98 }
99
100 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
101         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
102 }
103
104 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
105         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
106         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
107         if err != nil {
108                 return nil, errors.Wrap(err, "Error creating peer")
109         }
110
111         return &peerConn{
112                 config:   config,
113                 outbound: outbound,
114                 conn:     conn,
115         }, nil
116 }
117
118 // Addr returns peer's remote network address.
119 func (p *Peer) Addr() net.Addr {
120         return p.conn.RemoteAddr()
121 }
122
123 // CanSend returns true if the send queue is not full, false otherwise.
124 func (p *Peer) CanSend(chID byte) bool {
125         if !p.IsRunning() {
126                 return false
127         }
128         return p.mconn.CanSend(chID)
129 }
130
131 // CloseConn should be used when the peer was created, but never started.
132 func (pc *peerConn) CloseConn() {
133         pc.conn.Close()
134 }
135
136 // Equals reports whenever 2 peers are actually represent the same node.
137 func (p *Peer) Equals(other *Peer) bool {
138         return p.Key == other.Key
139 }
140
141 // HandshakeTimeout performs a handshake between a given node and the peer.
142 // NOTE: blocking
143 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
144         // Set deadline for handshake so we don't block forever on conn.ReadFull
145         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
146                 return nil, err
147         }
148
149         var peerNodeInfo = new(NodeInfo)
150         var err1, err2 error
151         cmn.Parallel(
152                 func() {
153                         var n int
154                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
155                 },
156                 func() {
157                         var n int
158                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
159                         log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
160                 })
161         if err1 != nil {
162                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
163         }
164         if err2 != nil {
165                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
166         }
167
168         // Remove deadline
169         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
170                 return nil, err
171         }
172         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
173         return peerNodeInfo, nil
174 }
175
176 // ID return the uuid of the peer
177 func (p *Peer) ID() string {
178         return p.Key
179 }
180
181 // IsOutbound returns true if the connection is outbound, false otherwise.
182 func (p *Peer) IsOutbound() bool {
183         return p.outbound
184 }
185
186 // PubKey returns peer's public key.
187 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
188         return p.conn.(*connection.SecretConnection).RemotePubKey()
189 }
190
191 // Send msg to the channel identified by chID byte. Returns false if the send
192 // queue is full after timeout, specified by MConnection.
193 func (p *Peer) Send(chID byte, msg interface{}) bool {
194         if !p.IsRunning() {
195                 return false
196         }
197         return p.mconn.Send(chID, msg)
198 }
199
200 // ServiceFlag return the ServiceFlag of this peer
201 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
202         services := consensus.SFFullNode
203         if len(p.Other) == 0 {
204                 return services
205         }
206
207         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
208                 services = consensus.ServiceFlag(serviceFlag)
209         }
210         return services
211 }
212
213 // String representation.
214 func (p *Peer) String() string {
215         if p.outbound {
216                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
217         }
218         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
219 }
220
221 // TrafficStatus return the in and out traffic status
222 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
223         return p.mconn.TrafficStatus()
224 }
225
226 // TrySend msg to the channel identified by chID byte. Immediately returns
227 // false if the send queue is full.
228 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
229         if !p.IsRunning() {
230                 return false
231         }
232
233         log.WithFields(log.Fields{
234                 "module": logModule,
235                 "peer":   p.Addr(),
236                 "msg":    msg,
237                 "type":   reflect.TypeOf(msg),
238         }).Info("send message to peer")
239         return p.mconn.TrySend(chID, msg)
240 }
241
242 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
243         onReceive := func(chID byte, msgBytes []byte) {
244                 reactor := reactorsByCh[chID]
245                 if reactor == nil {
246                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
247                 }
248                 reactor.Receive(chID, p, msgBytes)
249         }
250
251         onError := func(r interface{}) {
252                 onPeerError(p, r)
253         }
254         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
255 }
256
257 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
258         var conn net.Conn
259         var err error
260         if config.ProxyAddress == "" {
261                 conn, err = addr.DialTimeout(config.DialTimeout)
262         } else {
263                 proxy := &socks.Proxy{
264                         Addr:         config.ProxyAddress,
265                         Username:     config.ProxyUsername,
266                         Password:     config.ProxyPassword,
267                         TorIsolation: false,
268                 }
269                 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
270         }
271         if err != nil {
272                 return nil, err
273         }
274         return conn, nil
275 }