OSDN Git Service

Merge pull request #1431 from Bytom/fix_bug
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "fmt"
5         "net"
6         "strconv"
7         "time"
8
9         "github.com/pkg/errors"
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14         "github.com/tendermint/tmlibs/flowrate"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/p2p/connection"
19 )
20
21 // peerConn contains the raw connection and its config.
22 type peerConn struct {
23         outbound bool
24         config   *PeerConfig
25         conn     net.Conn // source connection
26 }
27
28 // PeerConfig is a Peer configuration.
29 type PeerConfig struct {
30         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
31         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
32         MConfig          *connection.MConnConfig `mapstructure:"connection"`
33 }
34
35 // DefaultPeerConfig returns the default config.
36 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
37         return &PeerConfig{
38                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
39                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
40                 MConfig:          connection.DefaultMConnConfig(),
41         }
42 }
43
44 // Peer represent a bytom network node
45 type Peer struct {
46         cmn.BaseService
47         *NodeInfo
48         *peerConn
49         mconn *connection.MConnection // multiplex connection
50         Key   string
51 }
52
53 // OnStart implements BaseService.
54 func (p *Peer) OnStart() error {
55         p.BaseService.OnStart()
56         _, err := p.mconn.Start()
57         return err
58 }
59
60 // OnStop implements BaseService.
61 func (p *Peer) OnStop() {
62         p.BaseService.OnStop()
63         p.mconn.Stop()
64 }
65
66 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
67         // Key and NodeInfo are set after Handshake
68         p := &Peer{
69                 peerConn: pc,
70                 NodeInfo: nodeInfo,
71                 Key:      nodeInfo.PubKey.KeyString(),
72         }
73         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
74         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
75         return p
76 }
77
78 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
79         conn, err := dial(addr, config)
80         if err != nil {
81                 return nil, errors.Wrap(err, "Error dial peer")
82         }
83
84         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
85         if err != nil {
86                 conn.Close()
87                 return nil, err
88         }
89         return pc, nil
90 }
91
92 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
93         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
94 }
95
96 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
97         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
98         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
99         if err != nil {
100                 return nil, errors.Wrap(err, "Error creating peer")
101         }
102
103         return &peerConn{
104                 config:   config,
105                 outbound: outbound,
106                 conn:     conn,
107         }, nil
108 }
109
110 // Addr returns peer's remote network address.
111 func (p *Peer) Addr() net.Addr {
112         return p.conn.RemoteAddr()
113 }
114
115 // CanSend returns true if the send queue is not full, false otherwise.
116 func (p *Peer) CanSend(chID byte) bool {
117         if !p.IsRunning() {
118                 return false
119         }
120         return p.mconn.CanSend(chID)
121 }
122
123 // CloseConn should be used when the peer was created, but never started.
124 func (pc *peerConn) CloseConn() {
125         pc.conn.Close()
126 }
127
128 // Equals reports whenever 2 peers are actually represent the same node.
129 func (p *Peer) Equals(other *Peer) bool {
130         return p.Key == other.Key
131 }
132
133 // HandshakeTimeout performs a handshake between a given node and the peer.
134 // NOTE: blocking
135 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
136         // Set deadline for handshake so we don't block forever on conn.ReadFull
137         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
138                 return nil, err
139         }
140
141         var peerNodeInfo = new(NodeInfo)
142         var err1, err2 error
143         cmn.Parallel(
144                 func() {
145                         var n int
146                         wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
147                 },
148                 func() {
149                         var n int
150                         wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
151                         log.WithField("address", peerNodeInfo.ListenAddr).Info("Peer handshake")
152                 })
153         if err1 != nil {
154                 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
155         }
156         if err2 != nil {
157                 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
158         }
159
160         // Remove deadline
161         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
162                 return nil, err
163         }
164         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
165         return peerNodeInfo, nil
166 }
167
168 // ID return the uuid of the peer
169 func (p *Peer) ID() string {
170         return p.Key
171 }
172
173 // IsOutbound returns true if the connection is outbound, false otherwise.
174 func (p *Peer) IsOutbound() bool {
175         return p.outbound
176 }
177
178 // PubKey returns peer's public key.
179 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
180         return p.conn.(*connection.SecretConnection).RemotePubKey()
181 }
182
183 // Send msg to the channel identified by chID byte. Returns false if the send
184 // queue is full after timeout, specified by MConnection.
185 func (p *Peer) Send(chID byte, msg interface{}) bool {
186         if !p.IsRunning() {
187                 return false
188         }
189         return p.mconn.Send(chID, msg)
190 }
191
192 // ServiceFlag return the ServiceFlag of this peer
193 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
194         services := consensus.SFFullNode
195         if len(p.Other) == 0 {
196                 return services
197         }
198
199         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
200                 services = consensus.ServiceFlag(serviceFlag)
201         }
202         return services
203 }
204
205 // String representation.
206 func (p *Peer) String() string {
207         if p.outbound {
208                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
209         }
210         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
211 }
212
213 // TrafficStatus return the in and out traffic status
214 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
215         return p.mconn.TrafficStatus()
216 }
217
218 // TrySend msg to the channel identified by chID byte. Immediately returns
219 // false if the send queue is full.
220 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
221         if !p.IsRunning() {
222                 return false
223         }
224         return p.mconn.TrySend(chID, msg)
225 }
226
227 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
228         onReceive := func(chID byte, msgBytes []byte) {
229                 reactor := reactorsByCh[chID]
230                 if reactor == nil {
231                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
232                 }
233                 reactor.Receive(chID, p, msgBytes)
234         }
235
236         onError := func(r interface{}) {
237                 onPeerError(p, r)
238         }
239         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
240 }
241
242 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
243         conn, err := addr.DialTimeout(config.DialTimeout)
244         if err != nil {
245                 return nil, err
246         }
247         return conn, nil
248 }