import (
"fmt"
"net"
+ "reflect"
"strconv"
"time"
+ "github.com/btcsuite/go-socks/socks"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
- crypto "github.com/tendermint/go-crypto"
- wire "github.com/tendermint/go-wire"
+ "github.com/tendermint/go-crypto"
+ "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
+ "github.com/tendermint/tmlibs/flowrate"
- cfg "github.com/bytom/config"
- "github.com/bytom/consensus"
- "github.com/bytom/p2p/connection"
+ cfg "github.com/bytom/bytom/config"
+ "github.com/bytom/bytom/consensus"
+ "github.com/bytom/bytom/p2p/connection"
)
// peerConn contains the raw connection and its config.
type PeerConfig struct {
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ ProxyAddress string `mapstructure:"proxy_address"`
+ ProxyUsername string `mapstructure:"proxy_username"`
+ ProxyPassword string `mapstructure:"proxy_password"`
MConfig *connection.MConnConfig `mapstructure:"connection"`
}
return &PeerConfig{
HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
+ ProxyAddress: config.ProxyAddress,
+ ProxyUsername: config.ProxyUsername,
+ ProxyPassword: config.ProxyPassword,
MConfig: connection.DefaultMConnConfig(),
}
}
*peerConn
mconn *connection.MConnection // multiplex connection
Key string
+ isLAN bool
}
// OnStart implements BaseService.
p.mconn.Stop()
}
-func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
// Key and NodeInfo are set after Handshake
p := &Peer{
peerConn: pc,
NodeInfo: nodeInfo,
Key: nodeInfo.PubKey.KeyString(),
+ isLAN: isLAN,
}
p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
// NOTE: blocking
func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
// Set deadline for handshake so we don't block forever on conn.ReadFull
- pc.conn.SetDeadline(time.Now().Add(timeout))
+ if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
+ return nil, err
+ }
var peerNodeInfo = new(NodeInfo)
var err1, err2 error
func() {
var n int
wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
- log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
+ log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
})
if err1 != nil {
return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
}
// Remove deadline
- pc.conn.SetDeadline(time.Time{})
+ if err := pc.conn.SetDeadline(time.Time{}); err != nil {
+ return nil, err
+ }
peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
return peerNodeInfo, nil
}
+// ID return the uuid of the peer
func (p *Peer) ID() string {
return p.Key
}
return p.outbound
}
+// IsLAN returns true if peer is LAN peer, false otherwise.
+func (p *Peer) IsLAN() bool {
+ return p.isLAN
+}
+
// PubKey returns peer's public key.
func (p *Peer) PubKey() crypto.PubKeyEd25519 {
return p.conn.(*connection.SecretConnection).RemotePubKey()
return p.mconn.Send(chID, msg)
}
+// ServiceFlag return the ServiceFlag of this peer
func (p *Peer) ServiceFlag() consensus.ServiceFlag {
services := consensus.SFFullNode
if len(p.Other) == 0 {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
}
+// TrafficStatus return the in and out traffic status
+func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+ return p.mconn.TrafficStatus()
+}
+
// TrySend msg to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
+
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "peer": p.Addr(),
+ "msg": msg,
+ "type": reflect.TypeOf(msg),
+ }).Info("send message to peer")
return p.mconn.TrySend(chID, msg)
}
}
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
- conn, err := addr.DialTimeout(config.DialTimeout)
+ var conn net.Conn
+ var err error
+ if config.ProxyAddress == "" {
+ conn, err = addr.DialTimeout(config.DialTimeout)
+ } else {
+ proxy := &socks.Proxy{
+ Addr: config.ProxyAddress,
+ Username: config.ProxyUsername,
+ Password: config.ProxyPassword,
+ TorIsolation: false,
+ }
+ conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)
+ }
if err != nil {
return nil, err
}