OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "strconv"
7         "time"
8
9         "github.com/btcsuite/go-socks/socks"
10         "github.com/pkg/errors"
11         log "github.com/sirupsen/logrus"
12         crypto "github.com/tendermint/go-crypto"
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15         "github.com/tendermint/tmlibs/flowrate"
16
17         cfg "github.com/vapor/config"
18         "github.com/vapor/consensus"
19         "github.com/vapor/p2p/connection"
20 )
21
22 // peerConn contains the raw connection and its config.
23 type peerConn struct {
24         outbound bool
25         config   *PeerConfig
26         conn     net.Conn // source connection
27 }
28
29 // PeerConfig is a Peer configuration.
30 type PeerConfig struct {
31         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
32         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
33         ProxyAddress     string                  `mapstructure:"proxy_address"`
34         ProxyUsername    string                  `mapstructure:"proxy_username"`
35         ProxyPassword    string                  `mapstructure:"proxy_password"`
36         MConfig          *connection.MConnConfig `mapstructure:"connection"`
37 }
38
39 // DefaultPeerConfig returns the default config.
40 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
41         return &PeerConfig{
42                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
43                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
44                 ProxyAddress:     config.ProxyAddress,
45                 ProxyUsername:    config.ProxyUsername,
46                 ProxyPassword:    config.ProxyPassword,
47                 MConfig:          connection.DefaultMConnConfig(),
48         }
49 }
50
51 // Peer represent a bytom network node
52 type Peer struct {
53         cmn.BaseService
54         *NodeInfo
55         *peerConn
56         mconn *connection.MConnection // multiplex connection
57         Key   string
58 }
59
60 // OnStart implements BaseService.
61 func (p *Peer) OnStart() error {
62         p.BaseService.OnStart()
63         _, err := p.mconn.Start()
64         return err
65 }
66
67 // OnStop implements BaseService.
68 func (p *Peer) OnStop() {
69         p.BaseService.OnStop()
70         p.mconn.Stop()
71 }
72
73 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
74         // Key and NodeInfo are set after Handshake
75         p := &Peer{
76                 peerConn: pc,
77                 NodeInfo: nodeInfo,
78                 Key:      nodeInfo.PubKey.KeyString(),
79         }
80         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
81         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
82         return p
83 }
84
85 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
86         conn, err := dial(addr, config)
87         if err != nil {
88                 return nil, errors.Wrap(err, "Error dial peer")
89         }
90
91         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
92         if err != nil {
93                 conn.Close()
94                 return nil, err
95         }
96         return pc, nil
97 }
98
99 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
100         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
101 }
102
103 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
104         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
105         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
106         if err != nil {
107                 return nil, errors.Wrap(err, "Error creating peer")
108         }
109
110         return &peerConn{
111                 config:   config,
112                 outbound: outbound,
113                 conn:     conn,
114         }, nil
115 }
116
117 // Addr returns peer's remote network address.
118 func (p *Peer) Addr() net.Addr {
119         return p.conn.RemoteAddr()
120 }
121
122 // CanSend returns true if the send queue is not full, false otherwise.
123 func (p *Peer) CanSend(chID byte) bool {
124         if !p.IsRunning() {
125                 return false
126         }
127         return p.mconn.CanSend(chID)
128 }
129
130 // CloseConn should be used when the peer was created, but never started.
131 func (pc *peerConn) CloseConn() {
132         pc.conn.Close()
133 }
134
135 // Equals reports whenever 2 peers are actually represent the same node.
136 func (p *Peer) Equals(other *Peer) bool {
137         return p.Key == other.Key
138 }
139
140 // HandshakeTimeout performs a handshake between a given node and the peer.
141 // NOTE: blocking
142 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
143         // Set deadline for handshake so we don't block forever on conn.ReadFull
144         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
145                 return nil, err
146         }
147
148         var peerNodeInfo = new(NodeInfo)
149         var err1, err2 error
150         cmn.Parallel(
151                 func() {
152                         var n int
153                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
154                 },
155                 func() {
156                         var n int
157                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
158                         log.WithField("address", peerNodeInfo.ListenAddr).Info("Peer handshake")
159                 })
160         if err1 != nil {
161                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
162         }
163         if err2 != nil {
164                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
165         }
166
167         // Remove deadline
168         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
169                 return nil, err
170         }
171         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
172         return peerNodeInfo, nil
173 }
174
175 // ID return the uuid of the peer
176 func (p *Peer) ID() string {
177         return p.Key
178 }
179
180 // IsOutbound returns true if the connection is outbound, false otherwise.
181 func (p *Peer) IsOutbound() bool {
182         return p.outbound
183 }
184
185 // PubKey returns peer's public key.
186 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
187         return p.conn.(*connection.SecretConnection).RemotePubKey()
188 }
189
190 // Send msg to the channel identified by chID byte. Returns false if the send
191 // queue is full after timeout, specified by MConnection.
192 func (p *Peer) Send(chID byte, msg interface{}) bool {
193         if !p.IsRunning() {
194                 return false
195         }
196         return p.mconn.Send(chID, msg)
197 }
198
199 // ServiceFlag return the ServiceFlag of this peer
200 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
201         services := consensus.SFFullNode
202         if len(p.Other) == 0 {
203                 return services
204         }
205
206         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
207                 services = consensus.ServiceFlag(serviceFlag)
208         }
209         return services
210 }
211
212 // String representation.
213 func (p *Peer) String() string {
214         if p.outbound {
215                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
216         }
217         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
218 }
219
220 // TrafficStatus return the in and out traffic status
221 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
222         return p.mconn.TrafficStatus()
223 }
224
225 // TrySend msg to the channel identified by chID byte. Immediately returns
226 // false if the send queue is full.
227 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
228         if !p.IsRunning() {
229                 return false
230         }
231         return p.mconn.TrySend(chID, msg)
232 }
233
234 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
235         onReceive := func(chID byte, msgBytes []byte) {
236                 reactor := reactorsByCh[chID]
237                 if reactor == nil {
238                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
239                 }
240                 reactor.Receive(chID, p, msgBytes)
241         }
242
243         onError := func(r interface{}) {
244                 onPeerError(p, r)
245         }
246         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
247 }
248
249 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
250         var conn net.Conn
251         var err error
252         if config.ProxyAddress == "" {
253                 conn, err = addr.DialTimeout(config.DialTimeout)
254         } else {
255                 proxy := &socks.Proxy{
256                         Addr:         config.ProxyAddress,
257                         Username:     config.ProxyUsername,
258                         Password:     config.ProxyPassword,
259                         TorIsolation: false,
260                 }
261                 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
262         }
263         if err != nil {
264                 return nil, err
265         }
266         return conn, nil
267 }