OSDN Git Service

docs(release note): update bytom version 1.1.0 release note
[bytom/bytom.git] / p2p / peer.go
index e35b633..5c0a07b 100644 (file)
@@ -3,18 +3,21 @@ package p2p
 import (
        "fmt"
        "net"
+       "reflect"
        "strconv"
        "time"
 
+       "github.com/btcsuite/go-socks/socks"
        "github.com/pkg/errors"
        log "github.com/sirupsen/logrus"
-       crypto "github.com/tendermint/go-crypto"
-       wire "github.com/tendermint/go-wire"
+       "github.com/tendermint/go-crypto"
+       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
+       "github.com/tendermint/tmlibs/flowrate"
 
-       cfg "github.com/bytom/config"
-       "github.com/bytom/consensus"
-       "github.com/bytom/p2p/connection"
+       cfg "github.com/bytom/bytom/config"
+       "github.com/bytom/bytom/consensus"
+       "github.com/bytom/bytom/p2p/connection"
 )
 
 // peerConn contains the raw connection and its config.
@@ -28,6 +31,9 @@ type peerConn struct {
 type PeerConfig struct {
        HandshakeTimeout time.Duration           `mapstructure:"handshake_timeout"` // times are in seconds
        DialTimeout      time.Duration           `mapstructure:"dial_timeout"`
+       ProxyAddress     string                  `mapstructure:"proxy_address"`
+       ProxyUsername    string                  `mapstructure:"proxy_username"`
+       ProxyPassword    string                  `mapstructure:"proxy_password"`
        MConfig          *connection.MConnConfig `mapstructure:"connection"`
 }
 
@@ -36,6 +42,9 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
        return &PeerConfig{
                HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
                DialTimeout:      time.Duration(config.DialTimeout) * time.Second,      // * time.Second,
+               ProxyAddress:     config.ProxyAddress,
+               ProxyUsername:    config.ProxyUsername,
+               ProxyPassword:    config.ProxyPassword,
                MConfig:          connection.DefaultMConnConfig(),
        }
 }
@@ -47,6 +56,7 @@ type Peer struct {
        *peerConn
        mconn *connection.MConnection // multiplex connection
        Key   string
+       isLAN bool
 }
 
 // OnStart implements BaseService.
@@ -62,12 +72,13 @@ func (p *Peer) OnStop() {
        p.mconn.Stop()
 }
 
-func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
        // Key and NodeInfo are set after Handshake
        p := &Peer{
                peerConn: pc,
                NodeInfo: nodeInfo,
                Key:      nodeInfo.PubKey.KeyString(),
+               isLAN:    isLAN,
        }
        p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
        p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
@@ -133,7 +144,9 @@ func (p *Peer) Equals(other *Peer) bool {
 // NOTE: blocking
 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
        // Set deadline for handshake so we don't block forever on conn.ReadFull
-       pc.conn.SetDeadline(time.Now().Add(timeout))
+       if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
+               return nil, err
+       }
 
        var peerNodeInfo = new(NodeInfo)
        var err1, err2 error
@@ -145,7 +158,7 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio
                func() {
                        var n int
                        wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
-                       log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
+                       log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
                })
        if err1 != nil {
                return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
@@ -155,11 +168,14 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio
        }
 
        // Remove deadline
-       pc.conn.SetDeadline(time.Time{})
+       if err := pc.conn.SetDeadline(time.Time{}); err != nil {
+               return nil, err
+       }
        peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
        return peerNodeInfo, nil
 }
 
+// ID return the uuid of the peer
 func (p *Peer) ID() string {
        return p.Key
 }
@@ -169,6 +185,11 @@ func (p *Peer) IsOutbound() bool {
        return p.outbound
 }
 
+// IsLAN returns true if peer is LAN peer, false otherwise.
+func (p *Peer) IsLAN() bool {
+       return p.isLAN
+}
+
 // PubKey returns peer's public key.
 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
        return p.conn.(*connection.SecretConnection).RemotePubKey()
@@ -183,6 +204,7 @@ func (p *Peer) Send(chID byte, msg interface{}) bool {
        return p.mconn.Send(chID, msg)
 }
 
+// ServiceFlag return the ServiceFlag of this peer
 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
        services := consensus.SFFullNode
        if len(p.Other) == 0 {
@@ -203,12 +225,24 @@ func (p *Peer) String() string {
        return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
 }
 
+// TrafficStatus return the in and out traffic status
+func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+       return p.mconn.TrafficStatus()
+}
+
 // TrySend msg to the channel identified by chID byte. Immediately returns
 // false if the send queue is full.
 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
        if !p.IsRunning() {
                return false
        }
+
+       log.WithFields(log.Fields{
+               "module": logModule,
+               "peer":   p.Addr(),
+               "msg":    msg,
+               "type":   reflect.TypeOf(msg),
+       }).Info("send message to peer")
        return p.mconn.TrySend(chID, msg)
 }
 
@@ -228,7 +262,19 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
 }
 
 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
-       conn, err := addr.DialTimeout(config.DialTimeout)
+       var conn net.Conn
+       var err error
+       if config.ProxyAddress == "" {
+               conn, err = addr.DialTimeout(config.DialTimeout)
+       } else {
+               proxy := &socks.Proxy{
+                       Addr:         config.ProxyAddress,
+                       Username:     config.ProxyUsername,
+                       Password:     config.ProxyPassword,
+                       TorIsolation: false,
+               }
+               conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
+       }
        if err != nil {
                return nil, err
        }