8 "github.com/pkg/errors"
9 log "github.com/sirupsen/logrus"
10 crypto "github.com/tendermint/go-crypto"
11 wire "github.com/tendermint/go-wire"
12 cmn "github.com/tendermint/tmlibs/common"
14 cfg "github.com/bytom/config"
15 "github.com/bytom/p2p/connection"
18 // peerConn contains the raw connection and its config.
19 type peerConn struct {
22 conn net.Conn // source connection
25 // Peer represent a bytom network node
29 // raw peerConn and the multiplex connection
31 mconn *connection.MConnection // multiplex connection
35 Data *cmn.CMap // User data.
38 // PeerConfig is a Peer configuration.
39 type PeerConfig struct {
40 AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
42 // times are in seconds
43 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
44 DialTimeout time.Duration `mapstructure:"dial_timeout"`
46 MConfig *connection.MConnConfig `mapstructure:"connection"`
48 Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
49 FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
52 // DefaultPeerConfig returns the default config.
53 func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
56 HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
57 DialTimeout: time.Duration(config.DialTimeout), // * time.Second,
58 MConfig: connection.DefaultMConnConfig(),
60 FuzzConfig: DefaultFuzzConnConfig(),
64 func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
65 // Key and NodeInfo are set after Handshake
72 p.Key = nodeInfo.PubKey.KeyString()
73 p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
75 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
79 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
80 return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
83 func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
84 conn, err := dial(addr, config)
86 return nil, errors.Wrap(err, "Error dial peer")
89 pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
98 func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
99 return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
102 func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
107 // so we have time to do peer handshakes and get set up
108 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
111 // Encrypt connection
113 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
116 conn, err = connection.MakeSecretConnection(conn, ourNodePrivKey)
118 return nil, errors.Wrap(err, "Error creating peer")
122 // Only the information we already have
130 // CloseConn should be used when the peer was created, but never started.
131 func (pc *peerConn) CloseConn() {
135 // HandshakeTimeout performs a handshake between a given node and the peer.
137 func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
138 // Set deadline for handshake so we don't block forever on conn.ReadFull
139 pc.conn.SetDeadline(time.Now().Add(timeout))
141 var peerNodeInfo = new(NodeInfo)
147 wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
151 wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
152 log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
155 return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
158 return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
162 pc.conn.SetDeadline(time.Time{})
163 peerNodeInfo.RemoteAddr = pc.conn.RemoteAddr().String()
164 return peerNodeInfo, nil
167 // Addr returns peer's remote network address.
168 func (p *Peer) Addr() net.Addr {
169 return p.conn.RemoteAddr()
172 // PubKey returns peer's public key.
173 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
174 if p.config.AuthEnc {
175 return p.conn.(*connection.SecretConnection).RemotePubKey()
177 if p.NodeInfo == nil {
178 panic("Attempt to get peer's PubKey before calling Handshake")
183 // OnStart implements BaseService.
184 func (p *Peer) OnStart() error {
185 p.BaseService.OnStart()
186 _, err := p.mconn.Start()
190 // OnStop implements BaseService.
191 func (p *Peer) OnStop() {
192 p.BaseService.OnStop()
196 // Connection returns underlying MConnection.
197 func (p *Peer) Connection() *connection.MConnection {
201 // IsOutbound returns true if the connection is outbound, false otherwise.
202 func (p *Peer) IsOutbound() bool {
206 // Send msg to the channel identified by chID byte. Returns false if the send
207 // queue is full after timeout, specified by MConnection.
208 func (p *Peer) Send(chID byte, msg interface{}) bool {
210 // see Switch#Broadcast, where we fetch the list of peers and loop over
211 // them - while we're looping, one peer may be removed and stopped.
214 return p.mconn.Send(chID, msg)
217 // TrySend msg to the channel identified by chID byte. Immediately returns
218 // false if the send queue is full.
219 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
223 return p.mconn.TrySend(chID, msg)
226 // CanSend returns true if the send queue is not full, false otherwise.
227 func (p *Peer) CanSend(chID byte) bool {
231 return p.mconn.CanSend(chID)
234 // String representation.
235 func (p *Peer) String() string {
237 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
240 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
243 // Equals reports whenever 2 peers are actually represent the same node.
244 func (p *Peer) Equals(other *Peer) bool {
245 return p.Key == other.Key
248 // Get the data for a given key.
249 func (p *Peer) Get(key string) interface{} {
250 return p.Data.Get(key)
253 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
254 conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
261 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
262 onReceive := func(chID byte, msgBytes []byte) {
263 reactor := reactorsByCh[chID]
265 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
267 reactor.Receive(chID, p, msgBytes)
270 onError := func(r interface{}) {
274 return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)