OSDN Git Service

merge with dev
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "io"
6         "net"
7         "time"
8
9         "github.com/pkg/errors"
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         cfg "github.com/bytom/config"
16 )
17
18 // Peer could be marked as persistent, in which case you can use
19 // Redial function to reconnect. Note that inbound peers can't be
20 // made persistent. They should be made persistent on the other end.
21 //
22
23 // peerConn contains the raw connection and its config.
24 type peerConn struct {
25         outbound bool
26         config   *PeerConfig
27         conn     net.Conn // source connection
28 }
29
30 // Before using a peer, you will need to perform a handshake on connection.
31 type Peer struct {
32         cmn.BaseService
33
34         // raw peerConn and the multiplex connection
35         *peerConn
36         mconn *MConnection // multiplex connection
37
38         *NodeInfo
39         Key  string
40         Data *cmn.CMap // User data.
41 }
42
43 // PeerConfig is a Peer configuration.
44 type PeerConfig struct {
45         AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
46
47         // times are in seconds
48         HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
49         DialTimeout      time.Duration `mapstructure:"dial_timeout"`
50
51         MConfig *MConnConfig `mapstructure:"connection"`
52
53         Fuzz       bool            `mapstructure:"fuzz"` // fuzz connection (for testing)
54         FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
55 }
56
57 // DefaultPeerConfig returns the default config.
58 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
59         return &PeerConfig{
60                 AuthEnc:          true,
61                 HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
62                 DialTimeout:      time.Duration(config.DialTimeout),      // * time.Second,
63                 MConfig:          DefaultMConnConfig(),
64                 Fuzz:             false,
65                 FuzzConfig:       DefaultFuzzConnConfig(),
66         }
67 }
68
69 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
70         // Key and NodeInfo are set after Handshake
71         p := &Peer{
72                 peerConn: pc,
73                 NodeInfo: nodeInfo,
74
75                 Data: cmn.NewCMap(),
76         }
77         p.Key = nodeInfo.PubKey.KeyString()
78         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
79
80         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
81         return p
82 }
83
84 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
85         return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
86 }
87
88 func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
89         conn, err := dial(addr, config)
90         if err != nil {
91                 return nil, errors.Wrap(err, "Error dial peer")
92         }
93
94         pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
95         if err != nil {
96                 conn.Close()
97                 return nil, err
98         }
99
100         return pc, nil
101 }
102
103 func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
104         return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
105 }
106
107 func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
108         conn := rawConn
109
110         // Fuzz connection
111         if config.Fuzz {
112                 // so we have time to do peer handshakes and get set up
113                 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
114         }
115
116         // Encrypt connection
117         if config.AuthEnc {
118                 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
119
120                 var err error
121                 conn, err = MakeSecretConnection(conn, ourNodePrivKey)
122                 if err != nil {
123                         return nil, errors.Wrap(err, "Error creating peer")
124                 }
125         }
126
127         // Only the information we already have
128         return &peerConn{
129                 config:   config,
130                 outbound: outbound,
131                 conn:     conn,
132         }, nil
133 }
134
135 // CloseConn should be used when the peer was created, but never started.
136 func (pc *peerConn) CloseConn() {
137         pc.conn.Close()
138 }
139
140 // HandshakeTimeout performs a handshake between a given node and the peer.
141 // NOTE: blocking
142 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
143         // Set deadline for handshake so we don't block forever on conn.ReadFull
144         pc.conn.SetDeadline(time.Now().Add(timeout))
145
146         var peerNodeInfo = new(NodeInfo)
147         var err1 error
148         var err2 error
149         cmn.Parallel(
150                 func() {
151                         var n int
152                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
153                 },
154                 func() {
155                         var n int
156                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
157                         log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
158                 })
159         if err1 != nil {
160                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
161         }
162         if err2 != nil {
163                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
164         }
165
166         // Remove deadline
167         pc.conn.SetDeadline(time.Time{})
168
169         return peerNodeInfo, nil
170 }
171
172 // Addr returns peer's remote network address.
173 func (p *Peer) Addr() net.Addr {
174         return p.conn.RemoteAddr()
175 }
176
177 // PubKey returns peer's public key.
178 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
179         if p.config.AuthEnc {
180                 return p.conn.(*SecretConnection).RemotePubKey()
181         }
182         if p.NodeInfo == nil {
183                 panic("Attempt to get peer's PubKey before calling Handshake")
184         }
185         return p.PubKey()
186 }
187
188 // OnStart implements BaseService.
189 func (p *Peer) OnStart() error {
190         p.BaseService.OnStart()
191         _, err := p.mconn.Start()
192         return err
193 }
194
195 // OnStop implements BaseService.
196 func (p *Peer) OnStop() {
197         p.BaseService.OnStop()
198         p.mconn.Stop()
199 }
200
201 // Connection returns underlying MConnection.
202 func (p *Peer) Connection() *MConnection {
203         return p.mconn
204 }
205
206 // IsOutbound returns true if the connection is outbound, false otherwise.
207 func (p *Peer) IsOutbound() bool {
208         return p.outbound
209 }
210
211 // Send msg to the channel identified by chID byte. Returns false if the send
212 // queue is full after timeout, specified by MConnection.
213 func (p *Peer) Send(chID byte, msg interface{}) bool {
214         if !p.IsRunning() {
215                 // see Switch#Broadcast, where we fetch the list of peers and loop over
216                 // them - while we're looping, one peer may be removed and stopped.
217                 return false
218         }
219         return p.mconn.Send(chID, msg)
220 }
221
222 // TrySend msg to the channel identified by chID byte. Immediately returns
223 // false if the send queue is full.
224 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
225         if !p.IsRunning() {
226                 return false
227         }
228         return p.mconn.TrySend(chID, msg)
229 }
230
231 // CanSend returns true if the send queue is not full, false otherwise.
232 func (p *Peer) CanSend(chID byte) bool {
233         if !p.IsRunning() {
234                 return false
235         }
236         return p.mconn.CanSend(chID)
237 }
238
239 // WriteTo writes the peer's public key to w.
240 func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
241         var n_ int
242         wire.WriteString(p.Key, w, &n_, &err)
243         n += int64(n_)
244         return
245 }
246
247 // String representation.
248 func (p *Peer) String() string {
249         if p.outbound {
250                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
251         }
252
253         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
254 }
255
256 // Equals reports whenever 2 peers are actually represent the same node.
257 func (p *Peer) Equals(other *Peer) bool {
258         return p.Key == other.Key
259 }
260
261 // Get the data for a given key.
262 func (p *Peer) Get(key string) interface{} {
263         return p.Data.Get(key)
264 }
265
266 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
267         conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
268         if err != nil {
269                 return nil, err
270         }
271         return conn, nil
272 }
273
274 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
275         onReceive := func(chID byte, msgBytes []byte) {
276                 reactor := reactorsByCh[chID]
277                 if reactor == nil {
278                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
279                 }
280                 reactor.Receive(chID, p, msgBytes)
281         }
282
283         onError := func(r interface{}) {
284                 onPeerError(p, r)
285         }
286
287         return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
288 }