X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=p2p%2Fpeer.go;h=e239e372a95bddab4d7c934d3348082c643d3a7b;hb=a177b8b4f2828248c5bf34561b877c2578b77dd1;hp=a32afff2773a712107fd5f40133effde1ed3fd06;hpb=08281341c2cb02ba11d4218576256688854790fc;p=bytom%2Fvapor.git diff --git a/p2p/peer.go b/p2p/peer.go index a32afff2..e239e372 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,20 +3,20 @@ package p2p import ( "fmt" "net" - "strconv" + "reflect" "time" "github.com/btcsuite/go-socks/socks" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - crypto "github.com/tendermint/go-crypto" - wire "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/flowrate" cfg "github.com/vapor/config" "github.com/vapor/consensus" "github.com/vapor/p2p/connection" + "github.com/vapor/p2p/signlib" ) // peerConn contains the raw connection and its config. @@ -55,6 +55,7 @@ type Peer struct { *peerConn mconn *connection.MConnection // multiplex connection Key string + isLAN bool } // OnStart implements BaseService. @@ -70,19 +71,20 @@ func (p *Peer) OnStop() { p.mconn.Stop() } -func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { +func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer { // Key and NodeInfo are set after Handshake p := &Peer{ peerConn: pc, NodeInfo: nodeInfo, - Key: nodeInfo.PubKey.KeyString(), + Key: nodeInfo.PubKey, + isLAN: isLAN, } p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig) p.BaseService = *cmn.NewBaseService(nil, "Peer", p) return p } -func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { +func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) { conn, err := dial(addr, config) if err != nil { return nil, errors.Wrap(err, "Error dial peer") @@ -96,17 +98,22 @@ func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, return pc, nil } -func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) { +func newInboundPeerConn(conn net.Conn, ourNodePrivKey signlib.PrivKey, config *cfg.P2PConfig) (*peerConn, error) { return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config)) } -func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { +func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey signlib.PrivKey, config *PeerConfig) (*peerConn, error) { rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout)) conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey) if err != nil { return nil, errors.Wrap(err, "Error creating peer") } + // Remove deadline + if err := rawConn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + return &peerConn{ config: config, outbound: outbound, @@ -155,7 +162,7 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio func() { var n int wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2) - log.WithField("address", peerNodeInfo.ListenAddr).Info("Peer handshake") + log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake") }) if err1 != nil { return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write") @@ -182,9 +189,14 @@ func (p *Peer) IsOutbound() bool { return p.outbound } +// IsLAN returns true if peer is LAN peer, false otherwise. +func (p *Peer) IsLAN() bool { + return p.isLAN +} + // PubKey returns peer's public key. -func (p *Peer) PubKey() crypto.PubKeyEd25519 { - return p.conn.(*connection.SecretConnection).RemotePubKey() +func (p *Peer) PubKey() string { + return p.conn.(*connection.SecretConnection).RemotePubKey().String() } // Send msg to the channel identified by chID byte. Returns false if the send @@ -198,15 +210,8 @@ func (p *Peer) Send(chID byte, msg interface{}) bool { // ServiceFlag return the ServiceFlag of this peer func (p *Peer) ServiceFlag() consensus.ServiceFlag { - services := consensus.SFFullNode - if len(p.Other) == 0 { - return services - } - - if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil { - services = consensus.ServiceFlag(serviceFlag) - } - return services + // ServiceFlag return the ServiceFlag of this peer + return p.NodeInfo.ServiceFlag } // String representation. @@ -228,6 +233,13 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } + + log.WithFields(log.Fields{ + "module": logModule, + "peer": p.Addr(), + "msg": msg, + "type": reflect.TypeOf(msg), + }).Info("send message to peer") return p.mconn.TrySend(chID, msg) }