OSDN Git Service

Merge pull request #57 from liuchengxu/log
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "io"
6         "net"
7         "time"
8
9         "github.com/pkg/errors"
10         crypto "github.com/tendermint/go-crypto"
11         wire "github.com/tendermint/go-wire"
12         cmn "github.com/tendermint/tmlibs/common"
13 )
14
15 // Peer could be marked as persistent, in which case you can use
16 // Redial function to reconnect. Note that inbound peers can't be
17 // made persistent. They should be made persistent on the other end.
18 //
19 // Before using a peer, you will need to perform a handshake on connection.
20 type Peer struct {
21         cmn.BaseService
22
23         outbound bool
24
25         conn  net.Conn     // source connection
26         mconn *MConnection // multiplex connection
27
28         persistent bool
29         config     *PeerConfig
30
31         *NodeInfo
32         Key  string
33         Data *cmn.CMap // User data.
34 }
35
36 // PeerConfig is a Peer configuration.
37 type PeerConfig struct {
38         AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
39
40         // times are in seconds
41         HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
42         DialTimeout      time.Duration `mapstructure:"dial_timeout"`
43
44         MConfig *MConnConfig `mapstructure:"connection"`
45
46         Fuzz       bool            `mapstructure:"fuzz"` // fuzz connection (for testing)
47         FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
48 }
49
50 // DefaultPeerConfig returns the default config.
51 func DefaultPeerConfig() *PeerConfig {
52         return &PeerConfig{
53                 AuthEnc:          true,
54                 HandshakeTimeout: 20, // * time.Second,
55                 DialTimeout:      3,  // * time.Second,
56                 MConfig:          DefaultMConnConfig(),
57                 Fuzz:             false,
58                 FuzzConfig:       DefaultFuzzConnConfig(),
59         }
60 }
61
62 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
63         return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
64 }
65
66 func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
67         conn, err := dial(addr, config)
68         if err != nil {
69                 return nil, errors.Wrap(err, "Error creating peer")
70         }
71
72         peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
73         if err != nil {
74                 conn.Close()
75                 return nil, err
76         }
77         return peer, nil
78 }
79
80 func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
81         return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
82 }
83
84 func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
85         return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
86 }
87
88 func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
89         conn := rawConn
90
91         // Fuzz connection
92         if config.Fuzz {
93                 // so we have time to do peer handshakes and get set up
94                 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
95         }
96
97         // Encrypt connection
98         if config.AuthEnc {
99                 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
100
101                 var err error
102                 conn, err = MakeSecretConnection(conn, ourNodePrivKey)
103                 if err != nil {
104                         return nil, errors.Wrap(err, "Error creating peer")
105                 }
106         }
107
108         // Key and NodeInfo are set after Handshake
109         p := &Peer{
110                 outbound: outbound,
111                 conn:     conn,
112                 config:   config,
113                 Data:     cmn.NewCMap(),
114         }
115
116         p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
117
118         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
119
120         return p, nil
121 }
122
123 // CloseConn should be used when the peer was created, but never started.
124 func (p *Peer) CloseConn() {
125         p.conn.Close()
126 }
127
128 // makePersistent marks the peer as persistent.
129 func (p *Peer) makePersistent() {
130         if !p.outbound {
131                 panic("inbound peers can't be made persistent")
132         }
133
134         p.persistent = true
135 }
136
137 // IsPersistent returns true if the peer is persitent, false otherwise.
138 func (p *Peer) IsPersistent() bool {
139         return p.persistent
140 }
141
142 // HandshakeTimeout performs a handshake between a given node and the peer.
143 // NOTE: blocking
144 func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
145         // Set deadline for handshake so we don't block forever on conn.ReadFull
146         p.conn.SetDeadline(time.Now().Add(timeout))
147
148         var peerNodeInfo = new(NodeInfo)
149         var err1 error
150         var err2 error
151         cmn.Parallel(
152                 func() {
153                         var n int
154                         wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
155                 },
156                 func() {
157                         var n int
158                         wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
159                         p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
160                 })
161         if err1 != nil {
162                 return errors.Wrap(err1, "Error during handshake/write")
163         }
164         if err2 != nil {
165                 return errors.Wrap(err2, "Error during handshake/read")
166         }
167
168         if p.config.AuthEnc {
169                 // Check that the professed PubKey matches the sconn's.
170                 if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
171                         return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
172                                 peerNodeInfo.PubKey, p.PubKey())
173                 }
174         }
175
176         // Remove deadline
177         p.conn.SetDeadline(time.Time{})
178
179         peerNodeInfo.RemoteAddr = p.Addr().String()
180
181         p.NodeInfo = peerNodeInfo
182         p.Key = peerNodeInfo.PubKey.KeyString()
183
184         return nil
185 }
186
187 // Addr returns peer's remote network address.
188 func (p *Peer) Addr() net.Addr {
189         return p.conn.RemoteAddr()
190 }
191
192 // PubKey returns peer's public key.
193 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
194         if p.config.AuthEnc {
195                 return p.conn.(*SecretConnection).RemotePubKey()
196         }
197         if p.NodeInfo == nil {
198                 panic("Attempt to get peer's PubKey before calling Handshake")
199         }
200         return p.PubKey()
201 }
202
203 // OnStart implements BaseService.
204 func (p *Peer) OnStart() error {
205         p.BaseService.OnStart()
206         _, err := p.mconn.Start()
207         return err
208 }
209
210 // OnStop implements BaseService.
211 func (p *Peer) OnStop() {
212         p.BaseService.OnStop()
213         p.mconn.Stop()
214 }
215
216 // Connection returns underlying MConnection.
217 func (p *Peer) Connection() *MConnection {
218         return p.mconn
219 }
220
221 // IsOutbound returns true if the connection is outbound, false otherwise.
222 func (p *Peer) IsOutbound() bool {
223         return p.outbound
224 }
225
226 // Send msg to the channel identified by chID byte. Returns false if the send
227 // queue is full after timeout, specified by MConnection.
228 func (p *Peer) Send(chID byte, msg interface{}) bool {
229         if !p.IsRunning() {
230                 // see Switch#Broadcast, where we fetch the list of peers and loop over
231                 // them - while we're looping, one peer may be removed and stopped.
232                 return false
233         }
234         return p.mconn.Send(chID, msg)
235 }
236
237 // TrySend msg to the channel identified by chID byte. Immediately returns
238 // false if the send queue is full.
239 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
240         if !p.IsRunning() {
241                 return false
242         }
243         return p.mconn.TrySend(chID, msg)
244 }
245
246 // CanSend returns true if the send queue is not full, false otherwise.
247 func (p *Peer) CanSend(chID byte) bool {
248         if !p.IsRunning() {
249                 return false
250         }
251         return p.mconn.CanSend(chID)
252 }
253
254 // WriteTo writes the peer's public key to w.
255 func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
256         var n_ int
257         wire.WriteString(p.Key, w, &n_, &err)
258         n += int64(n_)
259         return
260 }
261
262 // String representation.
263 func (p *Peer) String() string {
264         if p.outbound {
265                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
266         }
267
268         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
269 }
270
271 // Equals reports whenever 2 peers are actually represent the same node.
272 func (p *Peer) Equals(other *Peer) bool {
273         return p.Key == other.Key
274 }
275
276 // Get the data for a given key.
277 func (p *Peer) Get(key string) interface{} {
278         return p.Data.Get(key)
279 }
280
281 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
282         conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
283         if err != nil {
284                 return nil, err
285         }
286         return conn, nil
287 }
288
289 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
290         onReceive := func(chID byte, msgBytes []byte) {
291                 reactor := reactorsByCh[chID]
292                 if reactor == nil {
293                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
294                 }
295                 reactor.Receive(chID, p, msgBytes)
296         }
297
298         onError := func(r interface{}) {
299                 onPeerError(p, r)
300         }
301
302         return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
303 }