9 "github.com/pkg/errors"
10 crypto "github.com/tendermint/go-crypto"
11 wire "github.com/tendermint/go-wire"
12 cmn "github.com/tendermint/tmlibs/common"
15 // Peer could be marked as persistent, in which case you can use
16 // Redial function to reconnect. Note that inbound peers can't be
17 // made persistent. They should be made persistent on the other end.
19 // Before using a peer, you will need to perform a handshake on connection.
25 conn net.Conn // source connection
26 mconn *MConnection // multiplex connection
33 Data *cmn.CMap // User data.
36 // PeerConfig is a Peer configuration.
37 type PeerConfig struct {
38 AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
40 // times are in seconds
41 HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
42 DialTimeout time.Duration `mapstructure:"dial_timeout"`
44 MConfig *MConnConfig `mapstructure:"connection"`
46 Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
47 FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
50 // DefaultPeerConfig returns the default config.
51 func DefaultPeerConfig() *PeerConfig {
54 HandshakeTimeout: 20, // * time.Second,
55 DialTimeout: 3, // * time.Second,
56 MConfig: DefaultMConnConfig(),
58 FuzzConfig: DefaultFuzzConnConfig(),
62 func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
63 return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
66 func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
67 conn, err := dial(addr, config)
69 return nil, errors.Wrap(err, "Error creating peer")
72 peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
80 func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
81 return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
84 func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
85 return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
88 func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
93 // so we have time to do peer handshakes and get set up
94 conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
99 conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
102 conn, err = MakeSecretConnection(conn, ourNodePrivKey)
104 return nil, errors.Wrap(err, "Error creating peer")
108 // Key and NodeInfo are set after Handshake
116 p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
118 p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
123 // CloseConn should be used when the peer was created, but never started.
124 func (p *Peer) CloseConn() {
128 // makePersistent marks the peer as persistent.
129 func (p *Peer) makePersistent() {
131 panic("inbound peers can't be made persistent")
137 // IsPersistent returns true if the peer is persitent, false otherwise.
138 func (p *Peer) IsPersistent() bool {
142 // HandshakeTimeout performs a handshake between a given node and the peer.
144 func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
145 // Set deadline for handshake so we don't block forever on conn.ReadFull
146 p.conn.SetDeadline(time.Now().Add(timeout))
148 var peerNodeInfo = new(NodeInfo)
154 wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
158 wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
159 p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
162 return errors.Wrap(err1, "Error during handshake/write")
165 return errors.Wrap(err2, "Error during handshake/read")
168 if p.config.AuthEnc {
169 // Check that the professed PubKey matches the sconn's.
170 if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
171 return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
172 peerNodeInfo.PubKey, p.PubKey())
177 p.conn.SetDeadline(time.Time{})
179 peerNodeInfo.RemoteAddr = p.Addr().String()
181 p.NodeInfo = peerNodeInfo
182 p.Key = peerNodeInfo.PubKey.KeyString()
187 // Addr returns peer's remote network address.
188 func (p *Peer) Addr() net.Addr {
189 return p.conn.RemoteAddr()
192 // PubKey returns peer's public key.
193 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
194 if p.config.AuthEnc {
195 return p.conn.(*SecretConnection).RemotePubKey()
197 if p.NodeInfo == nil {
198 panic("Attempt to get peer's PubKey before calling Handshake")
203 // OnStart implements BaseService.
204 func (p *Peer) OnStart() error {
205 p.BaseService.OnStart()
206 _, err := p.mconn.Start()
210 // OnStop implements BaseService.
211 func (p *Peer) OnStop() {
212 p.BaseService.OnStop()
216 // Connection returns underlying MConnection.
217 func (p *Peer) Connection() *MConnection {
221 // IsOutbound returns true if the connection is outbound, false otherwise.
222 func (p *Peer) IsOutbound() bool {
226 // Send msg to the channel identified by chID byte. Returns false if the send
227 // queue is full after timeout, specified by MConnection.
228 func (p *Peer) Send(chID byte, msg interface{}) bool {
230 // see Switch#Broadcast, where we fetch the list of peers and loop over
231 // them - while we're looping, one peer may be removed and stopped.
234 return p.mconn.Send(chID, msg)
237 // TrySend msg to the channel identified by chID byte. Immediately returns
238 // false if the send queue is full.
239 func (p *Peer) TrySend(chID byte, msg interface{}) bool {
243 return p.mconn.TrySend(chID, msg)
246 // CanSend returns true if the send queue is not full, false otherwise.
247 func (p *Peer) CanSend(chID byte) bool {
251 return p.mconn.CanSend(chID)
254 // WriteTo writes the peer's public key to w.
255 func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
257 wire.WriteString(p.Key, w, &n_, &err)
262 // String representation.
263 func (p *Peer) String() string {
265 return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
268 return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
271 // Equals reports whenever 2 peers are actually represent the same node.
272 func (p *Peer) Equals(other *Peer) bool {
273 return p.Key == other.Key
276 // Get the data for a given key.
277 func (p *Peer) Get(key string) interface{} {
278 return p.Data.Get(key)
281 func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
282 conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
289 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
290 onReceive := func(chID byte, msgBytes []byte) {
291 reactor := reactorsByCh[chID]
293 cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
295 reactor.Receive(chID, p, msgBytes)
298 onError := func(r interface{}) {
302 return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)