OSDN Git Service

cdb609d4e3f41b702187d90c5fbb1d08b0521d69
[bytom/vapor.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "reflect"
7         "strconv"
8         "time"
9
10         "github.com/btcsuite/go-socks/socks"
11         "github.com/pkg/errors"
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         "github.com/tendermint/go-wire"
15         cmn "github.com/tendermint/tmlibs/common"
16         "github.com/tendermint/tmlibs/flowrate"
17
18         cfg "github.com/vapor/config"
19         "github.com/vapor/consensus"
20         "github.com/vapor/p2p/connection"
21 )
22
23 // peerConn contains the raw connection and its config.
24 type peerConn struct {
25         outbound bool
26         config   *PeerConfig
27         conn     net.Conn // source connection
28 }
29
30 // PeerConfig is a Peer configuration.
31 type PeerConfig struct {
32         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
33         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
34         ProxyAddress     string                  `mapstructure:"proxy_address"`
35         ProxyUsername    string                  `mapstructure:"proxy_username"`
36         ProxyPassword    string                  `mapstructure:"proxy_password"`
37         MConfig          *connection.MConnConfig `mapstructure:"connection"`
38 }
39
40 // DefaultPeerConfig returns the default config.
41 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
42         return &PeerConfig{
43                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
44                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
45                 ProxyAddress:     config.ProxyAddress,
46                 ProxyUsername:    config.ProxyUsername,
47                 ProxyPassword:    config.ProxyPassword,
48                 MConfig:          connection.DefaultMConnConfig(),
49         }
50 }
51
52 // Peer represent a bytom network node
53 type Peer struct {
54         cmn.BaseService
55         *NodeInfo
56         *peerConn
57         mconn *connection.MConnection // multiplex connection
58         Key   string
59         isLAN bool
60 }
61
62 // OnStart implements BaseService.
63 func (p *Peer) OnStart() error {
64         p.BaseService.OnStart()
65         _, err := p.mconn.Start()
66         return err
67 }
68
69 // OnStop implements BaseService.
70 func (p *Peer) OnStop() {
71         p.BaseService.OnStop()
72         p.mconn.Stop()
73 }
74
75 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
76         // Key and NodeInfo are set after Handshake
77         p := &Peer{
78                 peerConn: pc,
79                 NodeInfo: nodeInfo,
80                 Key:      nodeInfo.PubKey.KeyString(),
81                 isLAN:    isLAN,
82         }
83         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
84         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
85         return p
86 }
87
88 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
89         conn, err := dial(addr, config)
90         if err != nil {
91                 return nil, errors.Wrap(err, "Error dial peer")
92         }
93
94         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
95         if err != nil {
96                 conn.Close()
97                 return nil, err
98         }
99         return pc, nil
100 }
101
102 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
103         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
104 }
105
106 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
107         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
108         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
109         if err != nil {
110                 return nil, errors.Wrap(err, "Error creating peer")
111         }
112
113         return &peerConn{
114                 config:   config,
115                 outbound: outbound,
116                 conn:     conn,
117         }, nil
118 }
119
120 // Addr returns peer's remote network address.
121 func (p *Peer) Addr() net.Addr {
122         return p.conn.RemoteAddr()
123 }
124
125 // CanSend returns true if the send queue is not full, false otherwise.
126 func (p *Peer) CanSend(chID byte) bool {
127         if !p.IsRunning() {
128                 return false
129         }
130         return p.mconn.CanSend(chID)
131 }
132
133 // CloseConn should be used when the peer was created, but never started.
134 func (pc *peerConn) CloseConn() {
135         pc.conn.Close()
136 }
137
138 // Equals reports whenever 2 peers are actually represent the same node.
139 func (p *Peer) Equals(other *Peer) bool {
140         return p.Key == other.Key
141 }
142
143 // HandshakeTimeout performs a handshake between a given node and the peer.
144 // NOTE: blocking
145 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
146         // Set deadline for handshake so we don't block forever on conn.ReadFull
147         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
148                 return nil, err
149         }
150
151         var peerNodeInfo = new(NodeInfo)
152         var err1, err2 error
153         cmn.Parallel(
154                 func() {
155                         var n int
156                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
157                 },
158                 func() {
159                         var n int
160                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
161                         log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
162                 })
163         if err1 != nil {
164                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
165         }
166         if err2 != nil {
167                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
168         }
169
170         // Remove deadline
171         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
172                 return nil, err
173         }
174         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
175         return peerNodeInfo, nil
176 }
177
178 // ID return the uuid of the peer
179 func (p *Peer) ID() string {
180         return p.Key
181 }
182
183 // IsOutbound returns true if the connection is outbound, false otherwise.
184 func (p *Peer) IsOutbound() bool {
185         return p.outbound
186 }
187
188 // IsLAN returns true if peer is LAN peer, false otherwise.
189 func (p *Peer) IsLAN() bool {
190         return p.isLAN
191 }
192
193 // PubKey returns peer's public key.
194 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
195         return p.conn.(*connection.SecretConnection).RemotePubKey()
196 }
197
198 // Send msg to the channel identified by chID byte. Returns false if the send
199 // queue is full after timeout, specified by MConnection.
200 func (p *Peer) Send(chID byte, msg interface{}) bool {
201         if !p.IsRunning() {
202                 return false
203         }
204         return p.mconn.Send(chID, msg)
205 }
206
207 // ServiceFlag return the ServiceFlag of this peer
208 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
209         services := consensus.SFFullNode
210         if len(p.Other) == 0 {
211                 return services
212         }
213
214         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
215                 services = consensus.ServiceFlag(serviceFlag)
216         }
217         return services
218 }
219
220 // String representation.
221 func (p *Peer) String() string {
222         if p.outbound {
223                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
224         }
225         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
226 }
227
228 // TrafficStatus return the in and out traffic status
229 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
230         return p.mconn.TrafficStatus()
231 }
232
233 // TrySend msg to the channel identified by chID byte. Immediately returns
234 // false if the send queue is full.
235 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
236         if !p.IsRunning() {
237                 return false
238         }
239
240         log.WithFields(log.Fields{
241                 "module": logModule,
242                 "peer":   p.Addr(),
243                 "msg":    msg,
244                 "type":   reflect.TypeOf(msg),
245         }).Info("send message to peer")
246         return p.mconn.TrySend(chID, msg)
247 }
248
249 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
250         onReceive := func(chID byte, msgBytes []byte) {
251                 reactor := reactorsByCh[chID]
252                 if reactor == nil {
253                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
254                 }
255                 reactor.Receive(chID, p, msgBytes)
256         }
257
258         onError := func(r interface{}) {
259                 onPeerError(p, r)
260         }
261         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
262 }
263
264 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
265         var conn net.Conn
266         var err error
267         if config.ProxyAddress == "" {
268                 conn, err = addr.DialTimeout(config.DialTimeout)
269         } else {
270                 proxy := &socks.Proxy{
271                         Addr:         config.ProxyAddress,
272                         Username:     config.ProxyUsername,
273                         Password:     config.ProxyPassword,
274                         TorIsolation: false,
275                 }
276                 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
277         }
278         if err != nil {
279                 return nil, err
280         }
281         return conn, nil
282 }