OSDN Git Service

Merge pull request #1333 from Bytom/dev
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "strconv"
7         "time"
8
9         "github.com/pkg/errors"
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/consensus"
17         "github.com/bytom/p2p/connection"
18 )
19
20 // peerConn contains the raw connection and its config.
21 type peerConn struct {
22         outbound bool
23         config   *PeerConfig
24         conn     net.Conn // source connection
25 }
26
27 // PeerConfig is a Peer configuration.
28 type PeerConfig struct {
29         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
30         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
31         MConfig          *connection.MConnConfig `mapstructure:"connection"`
32 }
33
34 // DefaultPeerConfig returns the default config.
35 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
36         return &PeerConfig{
37                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
38                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
39                 MConfig:          connection.DefaultMConnConfig(),
40         }
41 }
42
43 // Peer represent a bytom network node
44 type Peer struct {
45         cmn.BaseService
46         *NodeInfo
47         *peerConn
48         mconn *connection.MConnection // multiplex connection
49         Key   string
50 }
51
52 // OnStart implements BaseService.
53 func (p *Peer) OnStart() error {
54         p.BaseService.OnStart()
55         _, err := p.mconn.Start()
56         return err
57 }
58
59 // OnStop implements BaseService.
60 func (p *Peer) OnStop() {
61         p.BaseService.OnStop()
62         p.mconn.Stop()
63 }
64
65 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
66         // Key and NodeInfo are set after Handshake
67         p := &Peer{
68                 peerConn: pc,
69                 NodeInfo: nodeInfo,
70                 Key:      nodeInfo.PubKey.KeyString(),
71         }
72         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
73         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
74         return p
75 }
76
77 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
78         conn, err := dial(addr, config)
79         if err != nil {
80                 return nil, errors.Wrap(err, "Error dial peer")
81         }
82
83         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
84         if err != nil {
85                 conn.Close()
86                 return nil, err
87         }
88         return pc, nil
89 }
90
91 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
92         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
93 }
94
95 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
96         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
97         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
98         if err != nil {
99                 return nil, errors.Wrap(err, "Error creating peer")
100         }
101
102         return &peerConn{
103                 config:   config,
104                 outbound: outbound,
105                 conn:     conn,
106         }, nil
107 }
108
109 // Addr returns peer's remote network address.
110 func (p *Peer) Addr() net.Addr {
111         return p.conn.RemoteAddr()
112 }
113
114 // CanSend returns true if the send queue is not full, false otherwise.
115 func (p *Peer) CanSend(chID byte) bool {
116         if !p.IsRunning() {
117                 return false
118         }
119         return p.mconn.CanSend(chID)
120 }
121
122 // CloseConn should be used when the peer was created, but never started.
123 func (pc *peerConn) CloseConn() {
124         pc.conn.Close()
125 }
126
127 // Equals reports whenever 2 peers are actually represent the same node.
128 func (p *Peer) Equals(other *Peer) bool {
129         return p.Key == other.Key
130 }
131
132 // HandshakeTimeout performs a handshake between a given node and the peer.
133 // NOTE: blocking
134 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
135         // Set deadline for handshake so we don't block forever on conn.ReadFull
136         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
137                 return nil, err
138         }
139
140         var peerNodeInfo = new(NodeInfo)
141         var err1, err2 error
142         cmn.Parallel(
143                 func() {
144                         var n int
145                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
146                 },
147                 func() {
148                         var n int
149                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
150                         log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
151                 })
152         if err1 != nil {
153                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
154         }
155         if err2 != nil {
156                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
157         }
158
159         // Remove deadline
160         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
161                 return nil, err
162         }
163         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
164         return peerNodeInfo, nil
165 }
166
167 func (p *Peer) ID() string {
168         return p.Key
169 }
170
171 // IsOutbound returns true if the connection is outbound, false otherwise.
172 func (p *Peer) IsOutbound() bool {
173         return p.outbound
174 }
175
176 // PubKey returns peer's public key.
177 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
178         return p.conn.(*connection.SecretConnection).RemotePubKey()
179 }
180
181 // Send msg to the channel identified by chID byte. Returns false if the send
182 // queue is full after timeout, specified by MConnection.
183 func (p *Peer) Send(chID byte, msg interface{}) bool {
184         if !p.IsRunning() {
185                 return false
186         }
187         return p.mconn.Send(chID, msg)
188 }
189
190 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
191         services := consensus.SFFullNode
192         if len(p.Other) == 0 {
193                 return services
194         }
195
196         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
197                 services = consensus.ServiceFlag(serviceFlag)
198         }
199         return services
200 }
201
202 // String representation.
203 func (p *Peer) String() string {
204         if p.outbound {
205                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
206         }
207         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
208 }
209
210 // TrySend msg to the channel identified by chID byte. Immediately returns
211 // false if the send queue is full.
212 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
213         if !p.IsRunning() {
214                 return false
215         }
216         return p.mconn.TrySend(chID, msg)
217 }
218
219 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
220         onReceive := func(chID byte, msgBytes []byte) {
221                 reactor := reactorsByCh[chID]
222                 if reactor == nil {
223                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
224                 }
225                 reactor.Receive(chID, p, msgBytes)
226         }
227
228         onError := func(r interface{}) {
229                 onPeerError(p, r)
230         }
231         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
232 }
233
234 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
235         conn, err := addr.DialTimeout(config.DialTimeout)
236         if err != nil {
237                 return nil, err
238         }
239         return conn, nil
240 }