10 "github.com/btcsuite/go-socks/socks"
11 "github.com/pkg/errors"
12 log "github.com/sirupsen/logrus"
13 "github.com/tendermint/go-crypto"
14 "github.com/tendermint/go-wire"
15 cmn "github.com/tendermint/tmlibs/common"
16 "github.com/tendermint/tmlibs/flowrate"
18 cfg "github.com/vapor/config"
19 "github.com/vapor/consensus"
20 "github.com/vapor/p2p/connection"
23 // peerConn contains the raw connection and its config.
24 type peerConn struct {
27 conn net.Conn // source connection
30 // PeerConfig is a Peer configuration.
31 type PeerConfig struct {
32 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` // times are in seconds
33 DialTimeout time.Duration `mapstructure:"dial_timeout"`
34 ProxyAddress string `mapstructure:"proxy_address"`
35 ProxyUsername string `mapstructure:"proxy_username"`
36 ProxyPassword string `mapstructure:"proxy_password"`
37 MConfig *connection.MConnConfig `mapstructure:"connection"`
40 // DefaultPeerConfig returns the default config.
41 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
43 HandshakeTimeout: time.Duration(config.HandshakeTimeout) * time.Second, // * time.Second,
44 DialTimeout: time.Duration(config.DialTimeout) * time.Second, // * time.Second,
45 ProxyAddress: config.ProxyAddress,
46 ProxyUsername: config.ProxyUsername,
47 ProxyPassword: config.ProxyPassword,
48 MConfig: connection.DefaultMConnConfig(),
52 // Peer represent a bytom network node
57 mconn *connection.MConnection // multiplex connection
62 // OnStart implements BaseService.
63 func (p *Peer) OnStart() error {
64 p.BaseService.OnStart()
65 _, err := p.mconn.Start()
69 // OnStop implements BaseService.
70 func (p *Peer) OnStop() {
71 p.BaseService.OnStop()
75 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), isLAN bool) *Peer {
76 // Key and NodeInfo are set after Handshake
80 Key: nodeInfo.PubKey.KeyString(),
83 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
84 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
88 func newOutboundPeerConn(addr *NetAddress, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
89 conn, err := dial(addr, config)
91 return nil, errors.Wrap(err, "Error dial peer")
94 pc, err := newPeerConn(conn, true, ourNodePrivKey, config)
102 func newInboundPeerConn(conn net.Conn, ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
103 return newPeerConn(conn, false, ourNodePrivKey, DefaultPeerConfig(config))
106 func newPeerConn(rawConn net.Conn, outbound bool, ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
107 rawConn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
108 conn, err := connection.MakeSecretConnection(rawConn, ourNodePrivKey)
110 return nil, errors.Wrap(err, "Error creating peer")
120 // Addr returns peer's remote network address.
121 func (p *Peer) Addr() net.Addr {
122 return p.conn.RemoteAddr()
125 // CanSend returns true if the send queue is not full, false otherwise.
126 func (p *Peer) CanSend(chID byte) bool {
130 return p.mconn.CanSend(chID)
133 // CloseConn should be used when the peer was created, but never started.
134 func (pc *peerConn) CloseConn() {
138 // Equals reports whenever 2 peers are actually represent the same node.
139 func (p *Peer) Equals(other *Peer) bool {
140 return p.Key == other.Key
143 // HandshakeTimeout performs a handshake between a given node and the peer.
145 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
146 // Set deadline for handshake so we don't block forever on conn.ReadFull
147 if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
151 var peerNodeInfo = new(NodeInfo)
156 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
160 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
161 log.WithFields(log.Fields{"module": logModule, "address": pc.conn.RemoteAddr().String()}).Info("Peer handshake")
164 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
167 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
171 if err := pc.conn.SetDeadline(time.Time{}); err != nil {
174 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
175 return peerNodeInfo, nil
178 // ID return the uuid of the peer
179 func (p *Peer) ID() string {
183 // IsOutbound returns true if the connection is outbound, false otherwise.
184 func (p *Peer) IsOutbound() bool {
188 // IsLAN returns true if peer is LAN peer, false otherwise.
189 func (p *Peer) IsLAN() bool {
193 // PubKey returns peer's public key.
194 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
195 return p.conn.(*connection.SecretConnection).RemotePubKey()
198 // Send msg to the channel identified by chID byte. Returns false if the send
199 // queue is full after timeout, specified by MConnection.
200 func (p *Peer) Send(chID byte, msg interface{}) bool {
204 return p.mconn.Send(chID, msg)
207 // ServiceFlag return the ServiceFlag of this peer
208 func (p *Peer) ServiceFlag() consensus.ServiceFlag {
209 services := consensus.SFFullNode
210 if len(p.Other) == 0 {
214 if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
215 services = consensus.ServiceFlag(serviceFlag)
220 // String representation.
221 func (p *Peer) String() string {
223 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
225 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
228 // TrafficStatus return the in and out traffic status
229 func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
230 return p.mconn.TrafficStatus()
233 // TrySend msg to the channel identified by chID byte. Immediately returns
234 // false if the send queue is full.
235 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
240 log.WithFields(log.Fields{
244 "type": reflect.TypeOf(msg),
245 }).Info("send message to peer")
246 return p.mconn.TrySend(chID, msg)
249 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
250 onReceive := func(chID byte, msgBytes []byte) {
251 reactor := reactorsByCh[chID]
253 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
255 reactor.Receive(chID, p, msgBytes)
258 onError := func(r interface{}) {
261 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
264 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
267 if config.ProxyAddress == "" {
268 conn, err = addr.DialTimeout(config.DialTimeout)
270 proxy := &socks.Proxy{
271 Addr: config.ProxyAddress,
272 Username: config.ProxyUsername,
273 Password: config.ProxyPassword,
276 conn, err = addr.DialTimeoutWithProxy(proxy, config.DialTimeout)