OSDN Git Service

fix the bug (#372)
[bytom/vapor.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "reflect"
7         "time"
8
9         "github.com/btcsuite/go-socks/socks"
10         "github.com/pkg/errors"
11         log "github.com/sirupsen/logrus"
12         "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14         "github.com/tendermint/tmlibs/flowrate"
15
16         cfg "github.com/vapor/config"
17         "github.com/vapor/consensus"
18         "github.com/vapor/p2p/connection"
19         "github.com/vapor/p2p/signlib"
20 )
21
22 // peerConn contains the raw connection and its config.
23 type peerConn struct {
24         outbound bool
25         config   *PeerConfig
26         conn     net.Conn // source connection
27 }
28
29 // PeerConfig is a Peer configuration.
30 type PeerConfig struct {
31         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
32         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
33         ProxyAddress     string                  `mapstructure:"proxy_address"`
34         ProxyUsername    string                  `mapstructure:"proxy_username"`
35         ProxyPassword    string                  `mapstructure:"proxy_password"`
36         MConfig          *connection.MConnConfig `mapstructure:"connection"`
37 }
38
39 // DefaultPeerConfig returns the default config.
40 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
41         return &PeerConfig{
42                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
43                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
44                 ProxyAddress:     config.ProxyAddress,
45                 ProxyUsername:    config.ProxyUsername,
46                 ProxyPassword:    config.ProxyPassword,
47                 MConfig:          connection.DefaultMConnConfig(config.Compression),
48         }
49 }
50
51 // Peer represent a bytom network node
52 type Peer struct {
53         cmn.BaseService
54         *NodeInfo
55         *peerConn
56         mconn *connection.MConnection // multiplex connection
57         Key   string
58         isLAN bool
59 }
60
61 // OnStart implements BaseService.
62 func (p *Peer) OnStart() error {
63         p.BaseService.OnStart()
64         _, err := p.mconn.Start()
65         return err
66 }
67
68 // OnStop implements BaseService.
69 func (p *Peer) OnStop() {
70         p.BaseService.OnStop()
71         p.mconn.Stop()
72 }
73
74 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
75         // Key and NodeInfo are set after Handshake
76         p := &Peer{
77                 peerConn: pc,
78                 NodeInfo: nodeInfo,
79                 Key:      nodeInfo.PubKey,
80                 isLAN:    isLAN,
81         }
82         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
83         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
84         return p
85 }
86
87 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
88         conn, err := dial(addr, config)
89         if err != nil {
90                 return nil, errors.Wrap(err, "Error dial peer")
91         }
92
93         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
94         if err != nil {
95                 conn.Close()
96                 return nil, err
97         }
98         return pc, nil
99 }
100
101 func newInboundPeerConn(conn net.Conn, ourNodePrivKey signlib.PrivKey, config *cfg.P2PConfig) (*peerConn, error) {
102         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
103 }
104
105 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
106         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
107         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
108         if err != nil {
109                 return nil, errors.Wrap(err, "Error creating peer")
110         }
111
112         // Remove deadline
113         if err := rawConn.SetDeadline(time.Time{}); err != nil {
114                 return nil, err
115         }
116
117         return &peerConn{
118                 config:   config,
119                 outbound: outbound,
120                 conn:     conn,
121         }, nil
122 }
123
124 // Addr returns peer's remote network address.
125 func (p *Peer) Addr() net.Addr {
126         return p.conn.RemoteAddr()
127 }
128
129 // CanSend returns true if the send queue is not full, false otherwise.
130 func (p *Peer) CanSend(chID byte) bool {
131         if !p.IsRunning() {
132                 return false
133         }
134         return p.mconn.CanSend(chID)
135 }
136
137 // CloseConn should be used when the peer was created, but never started.
138 func (pc *peerConn) CloseConn() {
139         pc.conn.Close()
140 }
141
142 // Equals reports whenever 2 peers are actually represent the same node.
143 func (p *Peer) Equals(other *Peer) bool {
144         return p.Key == other.Key
145 }
146
147 // HandshakeTimeout performs a handshake between a given node and the peer.
148 // NOTE: blocking
149 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
150         // Set deadline for handshake so we don't block forever on conn.ReadFull
151         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
152                 return nil, err
153         }
154
155         var peerNodeInfo = new(NodeInfo)
156         var err1, err2 error
157         cmn.Parallel(
158                 func() {
159                         var n int
160                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
161                 },
162                 func() {
163                         var n int
164                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
165                         log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
166                 })
167         if err1 != nil {
168                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
169         }
170         if err2 != nil {
171                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
172         }
173
174         // Remove deadline
175         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
176                 return nil, err
177         }
178         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
179         return peerNodeInfo, nil
180 }
181
182 // ID return the uuid of the peer
183 func (p *Peer) ID() string {
184         return p.Key
185 }
186
187 // IsOutbound returns true if the connection is outbound, false otherwise.
188 func (p *Peer) IsOutbound() bool {
189         return p.outbound
190 }
191
192 // IsLAN returns true if peer is LAN peer, false otherwise.
193 func (p *Peer) IsLAN() bool {
194         return p.isLAN
195 }
196
197 // PubKey returns peer's public key.
198 func (p *Peer) PubKey() string {
199         return p.conn.(*connection.SecretConnection).RemotePubKey().String()
200 }
201
202 // Send msg to the channel identified by chID byte. Returns false if the send
203 // queue is full after timeout, specified by MConnection.
204 func (p *Peer) Send(chID byte, msg interface{}) bool {
205         if !p.IsRunning() {
206                 return false
207         }
208         return p.mconn.Send(chID, msg)
209 }
210
211 // ServiceFlag return the ServiceFlag of this peer
212 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
213         // ServiceFlag return the ServiceFlag of this peer
214         return p.NodeInfo.ServiceFlag
215 }
216
217 // String representation.
218 func (p *Peer) String() string {
219         if p.outbound {
220                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
221         }
222         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
223 }
224
225 // TrafficStatus return the in and out traffic status
226 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
227         return p.mconn.TrafficStatus()
228 }
229
230 // TrySend msg to the channel identified by chID byte. Immediately returns
231 // false if the send queue is full.
232 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
233         if !p.IsRunning() {
234                 return false
235         }
236
237         log.WithFields(log.Fields{
238                 "module": logModule,
239                 "peer":   p.Addr(),
240                 "msg":    msg,
241                 "type":   reflect.TypeOf(msg),
242         }).Debug("send message to peer")
243         return p.mconn.TrySend(chID, msg)
244 }
245
246 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
247         onReceive := func(chID byte, msgBytes []byte) {
248                 reactor := reactorsByCh[chID]
249                 if reactor == nil {
250                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
251                 }
252                 reactor.Receive(chID, p, msgBytes)
253         }
254
255         onError := func(r interface{}) {
256                 onPeerError(p, r)
257         }
258         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
259 }
260
261 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
262         var conn net.Conn
263         var err error
264         if config.ProxyAddress == "" {
265                 conn, err = addr.DialTimeout(config.DialTimeout)
266         } else {
267                 proxy := &socks.Proxy{
268                         Addr:         config.ProxyAddress,
269                         Username:     config.ProxyUsername,
270                         Password:     config.ProxyPassword,
271                         TorIsolation: false,
272                 }
273                 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
274         }
275         if err != nil {
276                 return nil, err
277         }
278         return conn, nil
279 }