9 "github.com/btcsuite/go-socks/socks"
10 "github.com/pkg/errors"
11 log "github.com/sirupsen/logrus"
12 "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
14 "github.com/tendermint/tmlibs/flowrate"
16 cfg "github.com/vapor/config"
17 "github.com/vapor/consensus"
18 "github.com/vapor/p2p/connection"
19 "github.com/vapor/p2p/signlib"
22 // peerConn contains the raw connection and its config.
23 type peerConn struct {
26 conn net.Conn // source connection
29 // PeerConfig is a Peer configuration.
30 type PeerConfig struct {
31 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
32 DialTimeout time.Duration `mapstructure:"dial_timeout"`
33 ProxyAddress string `mapstructure:"proxy_address"`
34 ProxyUsername string `mapstructure:"proxy_username"`
35 ProxyPassword string `mapstructure:"proxy_password"`
36 MConfig *connection.MConnConfig `mapstructure:"connection"`
39 // DefaultPeerConfig returns the default config.
40 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
42 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
43 DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
44 ProxyAddress: config.ProxyAddress,
45 ProxyUsername: config.ProxyUsername,
46 ProxyPassword: config.ProxyPassword,
47 MConfig: connection.DefaultMConnConfig(config.Compression),
51 // Peer represent a bytom network node
56 mconn *connection.MConnection // multiplex connection
61 // OnStart implements BaseService.
62 func (p *Peer) OnStart() error {
63 p.BaseService.OnStart()
64 _, err := p.mconn.Start()
68 // OnStop implements BaseService.
69 func (p *Peer) OnStop() {
70 p.BaseService.OnStop()
74 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
75 // Key and NodeInfo are set after Handshake
82 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
83 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
87 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
88 conn, err := dial(addr, config)
90 return nil, errors.Wrap(err, "Error dial peer")
93 pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
101 func newInboundPeerConn(conn net.Conn, ourNodePrivKey signlib.PrivKey, config *cfg.P2PConfig) (*peerConn, error) {
102 return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
105 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
106 rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
107 conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
109 return nil, errors.Wrap(err, "Error creating peer")
113 if err := rawConn.SetDeadline(time.Time{}); err != nil {
124 // Addr returns peer's remote network address.
125 func (p *Peer) Addr() net.Addr {
126 return p.conn.RemoteAddr()
129 // CanSend returns true if the send queue is not full, false otherwise.
130 func (p *Peer) CanSend(chID byte) bool {
134 return p.mconn.CanSend(chID)
137 // CloseConn should be used when the peer was created, but never started.
138 func (pc *peerConn) CloseConn() {
142 // Equals reports whenever 2 peers are actually represent the same node.
143 func (p *Peer) Equals(other *Peer) bool {
144 return p.Key == other.Key
147 // HandshakeTimeout performs a handshake between a given node and the peer.
149 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
150 // Set deadline for handshake so we don't block forever on conn.ReadFull
151 if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
155 var peerNodeInfo = new(NodeInfo)
160 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
164 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
165 log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
168 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
171 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
175 if err := pc.conn.SetDeadline(time.Time{}); err != nil {
178 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
179 return peerNodeInfo, nil
182 // ID return the uuid of the peer
183 func (p *Peer) ID() string {
187 // IsOutbound returns true if the connection is outbound, false otherwise.
188 func (p *Peer) IsOutbound() bool {
192 // IsLAN returns true if peer is LAN peer, false otherwise.
193 func (p *Peer) IsLAN() bool {
197 // PubKey returns peer's public key.
198 func (p *Peer) PubKey() string {
199 return p.conn.(*connection.SecretConnection).RemotePubKey().String()
202 // Send msg to the channel identified by chID byte. Returns false if the send
203 // queue is full after timeout, specified by MConnection.
204 func (p *Peer) Send(chID byte, msg interface{}) bool {
208 return p.mconn.Send(chID, msg)
211 // ServiceFlag return the ServiceFlag of this peer
212 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
213 // ServiceFlag return the ServiceFlag of this peer
214 return p.NodeInfo.ServiceFlag
217 // String representation.
218 func (p *Peer) String() string {
220 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
222 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
225 // TrafficStatus return the in and out traffic status
226 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
227 return p.mconn.TrafficStatus()
230 // TrySend msg to the channel identified by chID byte. Immediately returns
231 // false if the send queue is full.
232 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
237 log.WithFields(log.Fields{
241 "type": reflect.TypeOf(msg),
242 }).Debug("send message to peer")
243 return p.mconn.TrySend(chID, msg)
246 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
247 onReceive := func(chID byte, msgBytes []byte) {
248 reactor := reactorsByCh[chID]
250 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
252 reactor.Receive(chID, p, msgBytes)
255 onError := func(r interface{}) {
258 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
261 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
264 if config.ProxyAddress == "" {
265 conn, err = addr.DialTimeout(config.DialTimeout)
267 proxy := &socks.Proxy{
268 Addr: config.ProxyAddress,
269 Username: config.ProxyUsername,
270 Password: config.ProxyPassword,
273 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)