9 "github.com/pkg/errors"
10 log "github.com/sirupsen/logrus"
11 crypto "github.com/tendermint/go-crypto"
12 wire "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
14 "github.com/tendermint/tmlibs/flowrate"
16 cfg "github.com/bytom/config"
17 "github.com/bytom/consensus"
18 "github.com/bytom/p2p/connection"
21 // peerConn contains the raw connection and its config.
22 type peerConn struct {
25 conn net.Conn // source connection
28 // PeerConfig is a Peer configuration.
29 type PeerConfig struct {
30 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
31 DialTimeout time.Duration `mapstructure:"dial_timeout"`
32 MConfig *connection.MConnConfig `mapstructure:"connection"`
35 // DefaultPeerConfig returns the default config.
36 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
38 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
39 DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
40 MConfig: connection.DefaultMConnConfig(),
44 // Peer represent a bytom network node
49 mconn *connection.MConnection // multiplex connection
53 // OnStart implements BaseService.
54 func (p *Peer) OnStart() error {
55 p.BaseService.OnStart()
56 _, err := p.mconn.Start()
60 // OnStop implements BaseService.
61 func (p *Peer) OnStop() {
62 p.BaseService.OnStop()
66 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
67 // Key and NodeInfo are set after Handshake
71 Key: nodeInfo.PubKey.KeyString(),
73 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
74 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
78 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
79 conn, err := dial(addr, config)
81 return nil, errors.Wrap(err, "Error dial peer")
84 pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
92 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
93 return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
96 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
97 rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
98 conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
100 return nil, errors.Wrap(err, "Error creating peer")
110 // Addr returns peer's remote network address.
111 func (p *Peer) Addr() net.Addr {
112 return p.conn.RemoteAddr()
115 // CanSend returns true if the send queue is not full, false otherwise.
116 func (p *Peer) CanSend(chID byte) bool {
120 return p.mconn.CanSend(chID)
123 // CloseConn should be used when the peer was created, but never started.
124 func (pc *peerConn) CloseConn() {
128 // Equals reports whenever 2 peers are actually represent the same node.
129 func (p *Peer) Equals(other *Peer) bool {
130 return p.Key == other.Key
133 // HandshakeTimeout performs a handshake between a given node and the peer.
135 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
136 // Set deadline for handshake so we don't block forever on conn.ReadFull
137 if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
141 var peerNodeInfo = new(NodeInfo)
146 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
150 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
151 log.WithField("address", peerNodeInfo.ListenAddr).Info("Peer handshake")
154 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
157 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
161 if err := pc.conn.SetDeadline(time.Time{}); err != nil {
164 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
165 return peerNodeInfo, nil
168 // ID return the uuid of the peer
169 func (p *Peer) ID() string {
173 // IsOutbound returns true if the connection is outbound, false otherwise.
174 func (p *Peer) IsOutbound() bool {
178 // PubKey returns peer's public key.
179 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
180 return p.conn.(*connection.SecretConnection).RemotePubKey()
183 // Send msg to the channel identified by chID byte. Returns false if the send
184 // queue is full after timeout, specified by MConnection.
185 func (p *Peer) Send(chID byte, msg interface{}) bool {
189 return p.mconn.Send(chID, msg)
192 // ServiceFlag return the ServiceFlag of this peer
193 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
194 services := consensus.SFFullNode
195 if len(p.Other) == 0 {
199 if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
200 services = consensus.ServiceFlag(serviceFlag)
205 // String representation.
206 func (p *Peer) String() string {
208 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
210 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
213 // TrafficStatus return the in and out traffic status
214 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
215 return p.mconn.TrafficStatus()
218 // TrySend msg to the channel identified by chID byte. Immediately returns
219 // false if the send queue is full.
220 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
224 return p.mconn.TrySend(chID, msg)
227 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
228 onReceive := func(chID byte, msgBytes []byte) {
229 reactor := reactorsByCh[chID]
231 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
233 reactor.Receive(chID, p, msgBytes)
236 onError := func(r interface{}) {
239 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
242 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
243 conn, err := addr.DialTimeout(config.DialTimeout)