OSDN Git Service

Merge pull request #83 from Bytom/wallet
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "io"
6         "net"
7         "time"
8
9         "github.com/pkg/errors"
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14 )
15
16 // Peer could be marked as persistent, in which case you can use
17 // Redial function to reconnect. Note that inbound peers can't be
18 // made persistent. They should be made persistent on the other end.
19 //
20 // Before using a peer, you will need to perform a handshake on connection.
21 type Peer struct {
22         cmn.BaseService
23
24         outbound bool
25
26         conn  net.Conn     // source connection
27         mconn *MConnection // multiplex connection
28
29         persistent bool
30         config     *PeerConfig
31
32         *NodeInfo
33         Key  string
34         Data *cmn.CMap // User data.
35 }
36
37 // PeerConfig is a Peer configuration.
38 type PeerConfig struct {
39         AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
40
41         // times are in seconds
42         HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
43         DialTimeout      time.Duration `mapstructure:"dial_timeout"`
44
45         MConfig *MConnConfig `mapstructure:"connection"`
46
47         Fuzz       bool            `mapstructure:"fuzz"` // fuzz connection (for testing)
48         FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
49 }
50
51 // DefaultPeerConfig returns the default config.
52 func DefaultPeerConfig() *PeerConfig {
53         return &PeerConfig{
54                 AuthEnc:          true,
55                 HandshakeTimeout: 20, // * time.Second,
56                 DialTimeout:      3,  // * time.Second,
57                 MConfig:          DefaultMConnConfig(),
58                 Fuzz:             false,
59                 FuzzConfig:       DefaultFuzzConnConfig(),
60         }
61 }
62
63 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
64         return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
65 }
66
67 func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
68         conn, err := dial(addr, config)
69         if err != nil {
70                 return nil, errors.Wrap(err, "Error creating peer")
71         }
72
73         peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
74         if err != nil {
75                 conn.Close()
76                 return nil, err
77         }
78         return peer, nil
79 }
80
81 func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
82         return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
83 }
84
85 func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
86         return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
87 }
88
89 func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
90         conn := rawConn
91
92         // Fuzz connection
93         if config.Fuzz {
94                 // so we have time to do peer handshakes and get set up
95                 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
96         }
97
98         // Encrypt connection
99         if config.AuthEnc {
100                 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
101
102                 var err error
103                 conn, err = MakeSecretConnection(conn, ourNodePrivKey)
104                 if err != nil {
105                         return nil, errors.Wrap(err, "Error creating peer")
106                 }
107         }
108
109         // Key and NodeInfo are set after Handshake
110         p := &Peer{
111                 outbound: outbound,
112                 conn:     conn,
113                 config:   config,
114                 Data:     cmn.NewCMap(),
115         }
116
117         p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
118
119         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
120
121         return p, nil
122 }
123
124 // CloseConn should be used when the peer was created, but never started.
125 func (p *Peer) CloseConn() {
126         p.conn.Close()
127 }
128
129 // makePersistent marks the peer as persistent.
130 func (p *Peer) makePersistent() {
131         if !p.outbound {
132                 panic("inbound peers can't be made persistent")
133         }
134
135         p.persistent = true
136 }
137
138 // IsPersistent returns true if the peer is persitent, false otherwise.
139 func (p *Peer) IsPersistent() bool {
140         return p.persistent
141 }
142
143 // HandshakeTimeout performs a handshake between a given node and the peer.
144 // NOTE: blocking
145 func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
146         // Set deadline for handshake so we don't block forever on conn.ReadFull
147         p.conn.SetDeadline(time.Now().Add(timeout))
148
149         var peerNodeInfo = new(NodeInfo)
150         var err1 error
151         var err2 error
152         cmn.Parallel(
153                 func() {
154                         var n int
155                         wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
156                 },
157                 func() {
158                         var n int
159                         wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
160                         log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
161                 })
162         if err1 != nil {
163                 return errors.Wrap(err1, "Error during handshake/write")
164         }
165         if err2 != nil {
166                 return errors.Wrap(err2, "Error during handshake/read")
167         }
168
169         if p.config.AuthEnc {
170                 // Check that the professed PubKey matches the sconn's.
171                 if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
172                         return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
173                                 peerNodeInfo.PubKey, p.PubKey())
174                 }
175         }
176
177         // Remove deadline
178         p.conn.SetDeadline(time.Time{})
179
180         peerNodeInfo.RemoteAddr = p.Addr().String()
181
182         p.NodeInfo = peerNodeInfo
183         p.Key = peerNodeInfo.PubKey.KeyString()
184
185         return nil
186 }
187
188 // Addr returns peer's remote network address.
189 func (p *Peer) Addr() net.Addr {
190         return p.conn.RemoteAddr()
191 }
192
193 // PubKey returns peer's public key.
194 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
195         if p.config.AuthEnc {
196                 return p.conn.(*SecretConnection).RemotePubKey()
197         }
198         if p.NodeInfo == nil {
199                 panic("Attempt to get peer's PubKey before calling Handshake")
200         }
201         return p.PubKey()
202 }
203
204 // OnStart implements BaseService.
205 func (p *Peer) OnStart() error {
206         p.BaseService.OnStart()
207         _, err := p.mconn.Start()
208         return err
209 }
210
211 // OnStop implements BaseService.
212 func (p *Peer) OnStop() {
213         p.BaseService.OnStop()
214         p.mconn.Stop()
215 }
216
217 // Connection returns underlying MConnection.
218 func (p *Peer) Connection() *MConnection {
219         return p.mconn
220 }
221
222 // IsOutbound returns true if the connection is outbound, false otherwise.
223 func (p *Peer) IsOutbound() bool {
224         return p.outbound
225 }
226
227 // Send msg to the channel identified by chID byte. Returns false if the send
228 // queue is full after timeout, specified by MConnection.
229 func (p *Peer) Send(chID byte, msg interface{}) bool {
230         if !p.IsRunning() {
231                 // see Switch#Broadcast, where we fetch the list of peers and loop over
232                 // them - while we're looping, one peer may be removed and stopped.
233                 return false
234         }
235         return p.mconn.Send(chID, msg)
236 }
237
238 // TrySend msg to the channel identified by chID byte. Immediately returns
239 // false if the send queue is full.
240 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
241         if !p.IsRunning() {
242                 return false
243         }
244         return p.mconn.TrySend(chID, msg)
245 }
246
247 // CanSend returns true if the send queue is not full, false otherwise.
248 func (p *Peer) CanSend(chID byte) bool {
249         if !p.IsRunning() {
250                 return false
251         }
252         return p.mconn.CanSend(chID)
253 }
254
255 // WriteTo writes the peer's public key to w.
256 func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
257         var n_ int
258         wire.WriteString(p.Key, w, &n_, &err)
259         n += int64(n_)
260         return
261 }
262
263 // String representation.
264 func (p *Peer) String() string {
265         if p.outbound {
266                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
267         }
268
269         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
270 }
271
272 // Equals reports whenever 2 peers are actually represent the same node.
273 func (p *Peer) Equals(other *Peer) bool {
274         return p.Key == other.Key
275 }
276
277 // Get the data for a given key.
278 func (p *Peer) Get(key string) interface{} {
279         return p.Data.Get(key)
280 }
281
282 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
283         conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
284         if err != nil {
285                 return nil, err
286         }
287         return conn, nil
288 }
289
290 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
291         onReceive := func(chID byte, msgBytes []byte) {
292                 reactor := reactorsByCh[chID]
293                 if reactor == nil {
294                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
295                 }
296                 reactor.Receive(chID, p, msgBytes)
297         }
298
299         onError := func(r interface{}) {
300                 onPeerError(p, r)
301         }
302
303         return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
304 }