OSDN Git Service

Merge pull request #1025 from Bytom/dev
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "time"
7
8         "github.com/pkg/errors"
9         log "github.com/sirupsen/logrus"
10         crypto "github.com/tendermint/go-crypto"
11         wire "github.com/tendermint/go-wire"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         cfg "github.com/bytom/config"
15         "github.com/bytom/p2p/connection"
16 )
17
18 // peerConn contains the raw connection and its config.
19 type peerConn struct {
20         outbound bool
21         config   *PeerConfig
22         conn     net.Conn // source connection
23 }
24
25 // Peer represent a bytom network node
26 type Peer struct {
27         cmn.BaseService
28
29         // raw peerConn and the multiplex connection
30         *peerConn
31         mconn *connection.MConnection // multiplex connection
32
33         *NodeInfo
34         Key  string
35         Data *cmn.CMap // User data.
36 }
37
38 // PeerConfig is a Peer configuration.
39 type PeerConfig struct {
40         AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
41
42         // times are in seconds
43         HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
44         DialTimeout      time.Duration `mapstructure:"dial_timeout"`
45
46         MConfig *connection.MConnConfig `mapstructure:"connection"`
47
48         Fuzz       bool            `mapstructure:"fuzz"` // fuzz connection (for testing)
49         FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
50 }
51
52 // DefaultPeerConfig returns the default config.
53 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
54         return &PeerConfig{
55                 AuthEnc:          true,
56                 HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
57                 DialTimeout:      time.Duration(config.DialTimeout),      // * time.Second,
58                 MConfig:          connection.DefaultMConnConfig(),
59                 Fuzz:             false,
60                 FuzzConfig:       DefaultFuzzConnConfig(),
61         }
62 }
63
64 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
65         // Key and NodeInfo are set after Handshake
66         p := &Peer{
67                 peerConn: pc,
68                 NodeInfo: nodeInfo,
69
70                 Data: cmn.NewCMap(),
71         }
72         p.Key = nodeInfo.PubKey.KeyString()
73         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
74
75         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
76         return p
77 }
78
79 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
80         return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
81 }
82
83 func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
84         conn, err := dial(addr, config)
85         if err != nil {
86                 return nil, errors.Wrap(err, "Error dial peer")
87         }
88
89         pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
90         if err != nil {
91                 conn.Close()
92                 return nil, err
93         }
94
95         return pc, nil
96 }
97
98 func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
99         return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
100 }
101
102 func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
103         conn := rawConn
104
105         // Fuzz connection
106         if config.Fuzz {
107                 // so we have time to do peer handshakes and get set up
108                 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
109         }
110
111         // Encrypt connection
112         if config.AuthEnc {
113                 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
114
115                 var err error
116                 conn, err = connection.MakeSecretConnection(conn, ourNodePrivKey)
117                 if err != nil {
118                         return nil, errors.Wrap(err, "Error creating peer")
119                 }
120         }
121
122         // Only the information we already have
123         return &peerConn{
124                 config:   config,
125                 outbound: outbound,
126                 conn:     conn,
127         }, nil
128 }
129
130 // CloseConn should be used when the peer was created, but never started.
131 func (pc *peerConn) CloseConn() {
132         pc.conn.Close()
133 }
134
135 // HandshakeTimeout performs a handshake between a given node and the peer.
136 // NOTE: blocking
137 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
138         // Set deadline for handshake so we don't block forever on conn.ReadFull
139         pc.conn.SetDeadline(time.Now().Add(timeout))
140
141         var peerNodeInfo = new(NodeInfo)
142         var err1 error
143         var err2 error
144         cmn.Parallel(
145                 func() {
146                         var n int
147                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
148                 },
149                 func() {
150                         var n int
151                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
152                         log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
153                 })
154         if err1 != nil {
155                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
156         }
157         if err2 != nil {
158                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
159         }
160
161         // Remove deadline
162         pc.conn.SetDeadline(time.Time{})
163         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
164         return peerNodeInfo, nil
165 }
166
167 // Addr returns peer's remote network address.
168 func (p *Peer) Addr() net.Addr {
169         return p.conn.RemoteAddr()
170 }
171
172 // PubKey returns peer's public key.
173 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
174         if p.config.AuthEnc {
175                 return p.conn.(*connection.SecretConnection).RemotePubKey()
176         }
177         if p.NodeInfo == nil {
178                 panic("Attempt to get peer's PubKey before calling Handshake")
179         }
180         return p.PubKey()
181 }
182
183 // OnStart implements BaseService.
184 func (p *Peer) OnStart() error {
185         p.BaseService.OnStart()
186         _, err := p.mconn.Start()
187         return err
188 }
189
190 // OnStop implements BaseService.
191 func (p *Peer) OnStop() {
192         p.BaseService.OnStop()
193         p.mconn.Stop()
194 }
195
196 // Connection returns underlying MConnection.
197 func (p *Peer) Connection() *connection.MConnection {
198         return p.mconn
199 }
200
201 // IsOutbound returns true if the connection is outbound, false otherwise.
202 func (p *Peer) IsOutbound() bool {
203         return p.outbound
204 }
205
206 // Send msg to the channel identified by chID byte. Returns false if the send
207 // queue is full after timeout, specified by MConnection.
208 func (p *Peer) Send(chID byte, msg interface{}) bool {
209         if !p.IsRunning() {
210                 // see Switch#Broadcast, where we fetch the list of peers and loop over
211                 // them - while we're looping, one peer may be removed and stopped.
212                 return false
213         }
214         return p.mconn.Send(chID, msg)
215 }
216
217 // TrySend msg to the channel identified by chID byte. Immediately returns
218 // false if the send queue is full.
219 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
220         if !p.IsRunning() {
221                 return false
222         }
223         return p.mconn.TrySend(chID, msg)
224 }
225
226 // CanSend returns true if the send queue is not full, false otherwise.
227 func (p *Peer) CanSend(chID byte) bool {
228         if !p.IsRunning() {
229                 return false
230         }
231         return p.mconn.CanSend(chID)
232 }
233
234 // String representation.
235 func (p *Peer) String() string {
236         if p.outbound {
237                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
238         }
239
240         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
241 }
242
243 // Equals reports whenever 2 peers are actually represent the same node.
244 func (p *Peer) Equals(other *Peer) bool {
245         return p.Key == other.Key
246 }
247
248 // Get the data for a given key.
249 func (p *Peer) Get(key string) interface{} {
250         return p.Data.Get(key)
251 }
252
253 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
254         conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
255         if err != nil {
256                 return nil, err
257         }
258         return conn, nil
259 }
260
261 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
262         onReceive := func(chID byte, msgBytes []byte) {
263                 reactor := reactorsByCh[chID]
264                 if reactor == nil {
265                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
266                 }
267                 reactor.Receive(chID, p, msgBytes)
268         }
269
270         onError := func(r interface{}) {
271                 onPeerError(p, r)
272         }
273
274         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
275 }