OSDN Git Service

delete some black utxo (#2129)
[bytom/bytom.git] / p2p / peer.go
1 package p2p
2
3 import (
4         "crypto/ed25519"
5         "encoding/hex"
6         "fmt"
7         "net"
8         "reflect"
9         "strconv"
10         "time"
11
12         "github.com/btcsuite/go-socks/socks"
13         "github.com/pkg/errors"
14         log "github.com/sirupsen/logrus"
15         "github.com/tendermint/go-wire"
16         cmn "github.com/tendermint/tmlibs/common"
17         "github.com/tendermint/tmlibs/flowrate"
18
19         cfg "github.com/bytom/bytom/config"
20         "github.com/bytom/bytom/consensus"
21         "github.com/bytom/bytom/crypto/ed25519/chainkd"
22         "github.com/bytom/bytom/p2p/connection"
23 )
24
25 // peerConn contains the raw connection and its config.
26 type peerConn struct {
27         outbound bool
28         config   *PeerConfig
29         conn     net.Conn // source connection
30 }
31
32 // PeerConfig is a Peer configuration.
33 type PeerConfig struct {
34         HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
35         DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
36         ProxyAddress     string                  `mapstructure:"proxy_address"`
37         ProxyUsername    string                  `mapstructure:"proxy_username"`
38         ProxyPassword    string                  `mapstructure:"proxy_password"`
39         MConfig          *connection.MConnConfig `mapstructure:"connection"`
40 }
41
42 // DefaultPeerConfig returns the default config.
43 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
44         return &PeerConfig{
45                 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
46                 DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
47                 ProxyAddress:     config.ProxyAddress,
48                 ProxyUsername:    config.ProxyUsername,
49                 ProxyPassword:    config.ProxyPassword,
50                 MConfig:          connection.DefaultMConnConfig(),
51         }
52 }
53
54 // Peer represent a bytom network node
55 type Peer struct {
56         cmn.BaseService
57         *NodeInfo
58         *peerConn
59         mconn *connection.MConnection // multiplex connection
60         Key   string
61         isLAN bool
62 }
63
64 func (p *Peer) Moniker() string {
65         return p.NodeInfo.Moniker
66 }
67
68 // OnStart implements BaseService.
69 func (p *Peer) OnStart() error {
70         p.BaseService.OnStart()
71         return p.mconn.Start()
72 }
73
74 // OnStop implements BaseService.
75 func (p *Peer) OnStop() {
76         p.BaseService.OnStop()
77         p.mconn.Stop()
78 }
79
80 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
81         // Key and NodeInfo are set after Handshake
82         p := &Peer{
83                 peerConn: pc,
84                 NodeInfo: nodeInfo,
85                 Key:      hex.EncodeToString(nodeInfo.PubKey),
86                 isLAN:    isLAN,
87         }
88         p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
89         p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
90         return p
91 }
92
93 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey chainkd.XPrv, config *PeerConfig) (*peerConn, error) {
94         conn, err := dial(addr, config)
95         if err != nil {
96                 return nil, errors.Wrap(err, "Error dial peer")
97         }
98
99         pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
100         if err != nil {
101                 conn.Close()
102                 return nil, err
103         }
104         return pc, nil
105 }
106
107 func newInboundPeerConn(conn net.Conn, ourNodePrivKey chainkd.XPrv, config *cfg.P2PConfig) (*peerConn, error) {
108         return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
109 }
110
111 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey chainkd.XPrv, config *PeerConfig) (*peerConn, error) {
112         rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
113         conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
114         if err != nil {
115                 return nil, errors.Wrap(err, "Error creating peer")
116         }
117
118         return &peerConn{
119                 config:   config,
120                 outbound: outbound,
121                 conn:     conn,
122         }, nil
123 }
124
125 // Addr returns peer's remote network address.
126 func (p *Peer) Addr() net.Addr {
127         return p.conn.RemoteAddr()
128 }
129
130 // CanSend returns true if the send queue is not full, false otherwise.
131 func (p *Peer) CanSend(chID byte) bool {
132         if !p.IsRunning() {
133                 return false
134         }
135         return p.mconn.CanSend(chID)
136 }
137
138 // CloseConn should be used when the peer was created, but never started.
139 func (pc *peerConn) CloseConn() {
140         pc.conn.Close()
141 }
142
143 // Equals reports whenever 2 peers are actually represent the same node.
144 func (p *Peer) Equals(other *Peer) bool {
145         return p.Key == other.Key
146 }
147
148 // HandshakeTimeout performs a handshake between a given node and the peer.
149 // NOTE: blocking
150 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
151         // Set deadline for handshake so we don't block forever on conn.ReadFull
152         if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
153                 return nil, err
154         }
155
156         var peerNodeInfo = new(NodeInfo)
157         writeTask := func(i int) (val interface{}, err error, about bool) {
158                 var n int
159                 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err)
160                 return nil, err, false
161         }
162
163         readTask := func(i int) (val interface{}, err error, about bool) {
164                 var n int
165                 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err)
166                 return nil, err, false
167
168         }
169         cmn.Parallel(writeTask, readTask)
170
171         // In parallel, handle reads and writes
172         trs, ok := cmn.Parallel(writeTask, readTask)
173         if !ok {
174                 return nil, errors.New("Parallel task run failed")
175         }
176         for i := 0; i < 2; i++ {
177                 res, ok := trs.LatestResult(i)
178                 if !ok {
179                         return nil, fmt.Errorf("Task %d did not complete", i)
180                 }
181
182                 if res.Error != nil {
183                         return nil, errors.Wrap(res.Error, fmt.Sprintf("Task %d got error", i))
184                 }
185         }
186
187         // Remove deadline
188         if err := pc.conn.SetDeadline(time.Time{}); err != nil {
189                 return nil, err
190         }
191         peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
192         return peerNodeInfo, nil
193 }
194
195 // ID return the uuid of the peer
196 func (p *Peer) ID() string {
197         return p.Key
198 }
199
200 // IsOutbound returns true if the connection is outbound, false otherwise.
201 func (p *Peer) IsOutbound() bool {
202         return p.outbound
203 }
204
205 // IsLAN returns true if peer is LAN peer, false otherwise.
206 func (p *Peer) IsLAN() bool {
207         return p.isLAN
208 }
209
210 // PubKey returns peer's public key.
211 func (p *Peer) PubKey() ed25519.PublicKey {
212         return p.conn.(*connection.SecretConnection).RemotePubKey()
213 }
214
215 // Send msg to the channel identified by chID byte. Returns false if the send
216 // queue is full after timeout, specified by MConnection.
217 func (p *Peer) Send(chID byte, msg interface{}) bool {
218         if !p.IsRunning() {
219                 return false
220         }
221         return p.mconn.Send(chID, msg)
222 }
223
224 // ServiceFlag return the ServiceFlag of this peer
225 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
226         services := consensus.SFFullNode
227         if len(p.Other) == 0 {
228                 return services
229         }
230
231         if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
232                 services = consensus.ServiceFlag(serviceFlag)
233         }
234         return services
235 }
236
237 // String representation.
238 func (p *Peer) String() string {
239         if p.outbound {
240                 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
241         }
242         return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
243 }
244
245 // TrafficStatus return the in and out traffic status
246 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
247         return p.mconn.TrafficStatus()
248 }
249
250 // TrySend msg to the channel identified by chID byte. Immediately returns
251 // false if the send queue is full.
252 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
253         if !p.IsRunning() {
254                 return false
255         }
256
257         log.WithFields(log.Fields{
258                 "module": logModule,
259                 "peer":   p.Addr(),
260                 "msg":    msg,
261                 "type":   reflect.TypeOf(msg),
262         }).Info("send message to peer")
263         return p.mconn.TrySend(chID, msg)
264 }
265
266 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
267         onReceive := func(chID byte, msgBytes []byte) {
268                 reactor := reactorsByCh[chID]
269                 if reactor == nil {
270                         cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
271                 }
272                 reactor.Receive(chID, p, msgBytes)
273         }
274
275         onError := func(r interface{}) {
276                 onPeerError(p, r)
277         }
278         return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
279 }
280
281 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
282         var conn net.Conn
283         var err error
284         if config.ProxyAddress == "" {
285                 conn, err = addr.DialTimeout(config.DialTimeout)
286         } else {
287                 proxy := &socks.Proxy{
288                         Addr:         config.ProxyAddress,
289                         Username:     config.ProxyUsername,
290                         Password:     config.ProxyPassword,
291                         TorIsolation: false,
292                 }
293                 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
294         }
295         if err != nil {
296                 return nil, err
297         }
298         return conn, nil
299 }