12 "github.com/btcsuite/go-socks/socks"
13 "github.com/pkg/errors"
14 log "github.com/sirupsen/logrus"
15 "github.com/tendermint/go-wire"
16 cmn "github.com/tendermint/tmlibs/common"
17 "github.com/tendermint/tmlibs/flowrate"
19 cfg "github.com/bytom/bytom/config"
20 "github.com/bytom/bytom/consensus"
21 "github.com/bytom/bytom/crypto/ed25519/chainkd"
22 "github.com/bytom/bytom/p2p/connection"
25 // peerConn contains the raw connection and its config.
26 type peerConn struct {
29 conn net.Conn // source connection
32 // PeerConfig is a Peer configuration.
33 type PeerConfig struct {
34 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
35 DialTimeout time.Duration `mapstructure:"dial_timeout"`
36 ProxyAddress string `mapstructure:"proxy_address"`
37 ProxyUsername string `mapstructure:"proxy_username"`
38 ProxyPassword string `mapstructure:"proxy_password"`
39 MConfig *connection.MConnConfig `mapstructure:"connection"`
42 // DefaultPeerConfig returns the default config.
43 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
45 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
46 DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
47 ProxyAddress: config.ProxyAddress,
48 ProxyUsername: config.ProxyUsername,
49 ProxyPassword: config.ProxyPassword,
50 MConfig: connection.DefaultMConnConfig(),
54 // Peer represent a bytom network node
59 mconn *connection.MConnection // multiplex connection
64 func (p *Peer) Moniker() string {
65 return p.NodeInfo.Moniker
68 // OnStart implements BaseService.
69 func (p *Peer) OnStart() error {
70 p.BaseService.OnStart()
71 return p.mconn.Start()
74 // OnStop implements BaseService.
75 func (p *Peer) OnStop() {
76 p.BaseService.OnStop()
80 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
81 // Key and NodeInfo are set after Handshake
85 Key: hex.EncodeToString(nodeInfo.PubKey),
88 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
89 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
93 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey chainkd.XPrv, config *PeerConfig) (*peerConn, error) {
94 conn, err := dial(addr, config)
96 return nil, errors.Wrap(err, "Error dial peer")
99 pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
107 func newInboundPeerConn(conn net.Conn, ourNodePrivKey chainkd.XPrv, config *cfg.P2PConfig) (*peerConn, error) {
108 return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
111 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey chainkd.XPrv, config *PeerConfig) (*peerConn, error) {
112 rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
113 conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
115 return nil, errors.Wrap(err, "Error creating peer")
125 // Addr returns peer's remote network address.
126 func (p *Peer) Addr() net.Addr {
127 return p.conn.RemoteAddr()
130 // CanSend returns true if the send queue is not full, false otherwise.
131 func (p *Peer) CanSend(chID byte) bool {
135 return p.mconn.CanSend(chID)
138 // CloseConn should be used when the peer was created, but never started.
139 func (pc *peerConn) CloseConn() {
143 // Equals reports whenever 2 peers are actually represent the same node.
144 func (p *Peer) Equals(other *Peer) bool {
145 return p.Key == other.Key
148 // HandshakeTimeout performs a handshake between a given node and the peer.
150 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
151 // Set deadline for handshake so we don't block forever on conn.ReadFull
152 if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
156 var peerNodeInfo = new(NodeInfo)
157 writeTask := func(i int) (val interface{}, err error, about bool) {
159 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err)
160 return nil, err, false
163 readTask := func(i int) (val interface{}, err error, about bool) {
165 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err)
166 return nil, err, false
169 cmn.Parallel(writeTask, readTask)
171 // In parallel, handle reads and writes
172 trs, ok := cmn.Parallel(writeTask, readTask)
174 return nil, errors.New("Parallel task run failed")
176 for i := 0; i < 2; i++ {
177 res, ok := trs.LatestResult(i)
179 return nil, fmt.Errorf("Task %d did not complete", i)
182 if res.Error != nil {
183 return nil, errors.Wrap(res.Error, fmt.Sprintf("Task %d got error", i))
188 if err := pc.conn.SetDeadline(time.Time{}); err != nil {
191 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
192 return peerNodeInfo, nil
195 // ID return the uuid of the peer
196 func (p *Peer) ID() string {
200 // IsOutbound returns true if the connection is outbound, false otherwise.
201 func (p *Peer) IsOutbound() bool {
205 // IsLAN returns true if peer is LAN peer, false otherwise.
206 func (p *Peer) IsLAN() bool {
210 // PubKey returns peer's public key.
211 func (p *Peer) PubKey() ed25519.PublicKey {
212 return p.conn.(*connection.SecretConnection).RemotePubKey()
215 // Send msg to the channel identified by chID byte. Returns false if the send
216 // queue is full after timeout, specified by MConnection.
217 func (p *Peer) Send(chID byte, msg interface{}) bool {
221 return p.mconn.Send(chID, msg)
224 // ServiceFlag return the ServiceFlag of this peer
225 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
226 services := consensus.SFFullNode
227 if len(p.Other) == 0 {
231 if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
232 services = consensus.ServiceFlag(serviceFlag)
237 // String representation.
238 func (p *Peer) String() string {
240 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
242 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
245 // TrafficStatus return the in and out traffic status
246 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
247 return p.mconn.TrafficStatus()
250 // TrySend msg to the channel identified by chID byte. Immediately returns
251 // false if the send queue is full.
252 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
257 log.WithFields(log.Fields{
261 "type": reflect.TypeOf(msg),
262 }).Info("send message to peer")
263 return p.mconn.TrySend(chID, msg)
266 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
267 onReceive := func(chID byte, msgBytes []byte) {
268 reactor := reactorsByCh[chID]
270 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
272 reactor.Receive(chID, p, msgBytes)
275 onError := func(r interface{}) {
278 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
281 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
284 if config.ProxyAddress == "" {
285 conn, err = addr.DialTimeout(config.DialTimeout)
287 proxy := &socks.Proxy{
288 Addr: config.ProxyAddress,
289 Username: config.ProxyUsername,
290 Password: config.ProxyPassword,
293 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)