9 "github.com/pkg/errors"
10 log "github.com/sirupsen/logrus"
11 crypto "github.com/tendermint/go-crypto"
12 wire "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
15 cfg "github.com/bytom/config"
16 "github.com/bytom/consensus"
17 "github.com/bytom/p2p/connection"
20 // peerConn contains the raw connection and its config.
21 type peerConn struct {
24 conn net.Conn // source connection
27 // PeerConfig is a Peer configuration.
28 type PeerConfig struct {
29 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
30 DialTimeout time.Duration `mapstructure:"dial_timeout"`
31 MConfig *connection.MConnConfig `mapstructure:"connection"`
34 // DefaultPeerConfig returns the default config.
35 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
37 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
38 DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
39 MConfig: connection.DefaultMConnConfig(),
43 // Peer represent a bytom network node
48 mconn *connection.MConnection // multiplex connection
52 // OnStart implements BaseService.
53 func (p *Peer) OnStart() error {
54 p.BaseService.OnStart()
55 _, err := p.mconn.Start()
59 // OnStop implements BaseService.
60 func (p *Peer) OnStop() {
61 p.BaseService.OnStop()
65 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
66 // Key and NodeInfo are set after Handshake
70 Key: nodeInfo.PubKey.KeyString(),
72 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
73 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
77 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
78 conn, err := dial(addr, config)
80 return nil, errors.Wrap(err, "Error dial peer")
83 pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
91 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
92 return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
95 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
96 rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
97 conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
99 return nil, errors.Wrap(err, "Error creating peer")
109 // Addr returns peer's remote network address.
110 func (p *Peer) Addr() net.Addr {
111 return p.conn.RemoteAddr()
114 // CanSend returns true if the send queue is not full, false otherwise.
115 func (p *Peer) CanSend(chID byte) bool {
119 return p.mconn.CanSend(chID)
122 // CloseConn should be used when the peer was created, but never started.
123 func (pc *peerConn) CloseConn() {
127 // Equals reports whenever 2 peers are actually represent the same node.
128 func (p *Peer) Equals(other *Peer) bool {
129 return p.Key == other.Key
132 // HandshakeTimeout performs a handshake between a given node and the peer.
134 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
135 // Set deadline for handshake so we don't block forever on conn.ReadFull
136 pc.conn.SetDeadline(time.Now().Add(timeout))
138 var peerNodeInfo = new(NodeInfo)
143 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
147 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
148 log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
151 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
154 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
158 pc.conn.SetDeadline(time.Time{})
159 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
160 return peerNodeInfo, nil
163 func (p *Peer) ID() string {
167 // IsOutbound returns true if the connection is outbound, false otherwise.
168 func (p *Peer) IsOutbound() bool {
172 // PubKey returns peer's public key.
173 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
174 return p.conn.(*connection.SecretConnection).RemotePubKey()
177 // Send msg to the channel identified by chID byte. Returns false if the send
178 // queue is full after timeout, specified by MConnection.
179 func (p *Peer) Send(chID byte, msg interface{}) bool {
183 return p.mconn.Send(chID, msg)
186 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
187 services := consensus.SFFullNode
188 if len(p.Other) == 0 {
192 if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
193 services = consensus.ServiceFlag(serviceFlag)
198 // String representation.
199 func (p *Peer) String() string {
201 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
203 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
206 // TrySend msg to the channel identified by chID byte. Immediately returns
207 // false if the send queue is full.
208 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
212 return p.mconn.TrySend(chID, msg)
215 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
216 onReceive := func(chID byte, msgBytes []byte) {
217 reactor := reactorsByCh[chID]
219 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
221 reactor.Receive(chID, p, msgBytes)
224 onError := func(r interface{}) {
227 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
230 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
231 conn, err := addr.DialTimeout(config.DialTimeout)