OSDN Git Service

Peer add announces new block message num limit
[bytom/vapor.git] / p2p / peer.go
index a32afff..856d1e9 100644 (file)
@@ -3,20 +3,20 @@ package p2p
 import (
        "fmt"
        "net"
-       "strconv"
+       "reflect"
        "time"
 
        "github.com/btcsuite/go-socks/socks"
        "github.com/pkg/errors"
        log "github.com/sirupsen/logrus"
-       crypto "github.com/tendermint/go-crypto"
-       wire "github.com/tendermint/go-wire"
+       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        "github.com/tendermint/tmlibs/flowrate"
 
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
        "github.com/vapor/p2p/connection"
+       "github.com/vapor/p2p/signlib"
 )
 
 // peerConn contains the raw connection and its config.
@@ -44,7 +44,7 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
                ProxyAddress:     config.ProxyAddress,
                ProxyUsername:    config.ProxyUsername,
                ProxyPassword:    config.ProxyPassword,
-               MConfig:          connection.DefaultMConnConfig(),
+               MConfig:          connection.DefaultMConnConfig(config.Compression),
        }
 }
 
@@ -55,6 +55,7 @@ type Peer struct {
        *peerConn
        mconn *connection.MConnection // multiplex connection
        Key   string
+       isLAN bool
 }
 
 // OnStart implements BaseService.
@@ -70,19 +71,20 @@ func (p *Peer) OnStop() {
        p.mconn.Stop()
 }
 
-func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
        // Key and NodeInfo are set after Handshake
        p := &Peer{
                peerConn: pc,
                NodeInfo: nodeInfo,
-               Key:      nodeInfo.PubKey.KeyString(),
+               Key:      nodeInfo.PubKey,
+               isLAN:    isLAN,
        }
        p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
        p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
        return p
 }
 
-func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
        conn, err := dial(addr, config)
        if err != nil {
                return nil, errors.Wrap(err, "Error dial peer")
@@ -96,17 +98,22 @@ func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519,
        return pc, nil
 }
 
-func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+func newInboundPeerConn(conn net.Conn, ourNodePrivKey signlib.PrivKey, config *cfg.P2PConfig) (*peerConn, error) {
        return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) {
        rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
        conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
        if err != nil {
                return nil, errors.Wrap(err, "Error creating peer")
        }
 
+       // Remove deadline
+       if err := rawConn.SetDeadline(time.Time{}); err != nil {
+               return nil, err
+       }
+
        return &peerConn{
                config:   config,
                outbound: outbound,
@@ -155,7 +162,7 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio
                func() {
                        var n int
                        wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
-                       log.WithField("address", peerNodeInfo.ListenAddr).Info("Peer handshake")
+                       log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
                })
        if err1 != nil {
                return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
@@ -182,9 +189,14 @@ func (p *Peer) IsOutbound() bool {
        return p.outbound
 }
 
+// IsLAN returns true if peer is LAN peer, false otherwise.
+func (p *Peer) IsLAN() bool {
+       return p.isLAN
+}
+
 // PubKey returns peer's public key.
-func (p *Peer) PubKey() crypto.PubKeyEd25519 {
-       return p.conn.(*connection.SecretConnection).RemotePubKey()
+func (p *Peer) PubKey() string {
+       return p.conn.(*connection.SecretConnection).RemotePubKey().String()
 }
 
 // Send msg to the channel identified by chID byte. Returns false if the send
@@ -198,15 +210,8 @@ func (p *Peer) Send(chID byte, msg interface{}) bool {
 
 // ServiceFlag return the ServiceFlag of this peer
 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
-       services := consensus.SFFullNode
-       if len(p.Other) == 0 {
-               return services
-       }
-
-       if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
-               services = consensus.ServiceFlag(serviceFlag)
-       }
-       return services
+       // ServiceFlag return the ServiceFlag of this peer
+       return p.NodeInfo.ServiceFlag
 }
 
 // String representation.
@@ -228,6 +233,13 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool {
        if !p.IsRunning() {
                return false
        }
+
+       log.WithFields(log.Fields{
+               "module": logModule,
+               "peer":   p.Addr(),
+               "msg":    msg,
+               "type":   reflect.TypeOf(msg),
+       }).Debug("send message to peer")
        return p.mconn.TrySend(chID, msg)
 }