1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
15 "github.com/btcsuite/btcd/chaincfg"
16 "github.com/btcsuite/btcd/chaincfg/chainhash"
17 "github.com/btcsuite/btcd/peer"
18 "github.com/btcsuite/btcd/wire"
19 "github.com/btcsuite/go-socks/socks"
22 // conn mocks a network connection by implementing the net.Conn interface. It
23 // is used to test peer connection without actually opening a network
30 // local network, address for the connection.
33 // remote network, address for the connection.
36 // mocks socks proxy if true
40 // LocalAddr returns the local address for the connection.
41 func (c conn) LocalAddr() net.Addr {
42 return &addr{c.lnet, c.laddr}
45 // Remote returns the remote address for the connection.
46 func (c conn) RemoteAddr() net.Addr {
48 return &addr{c.rnet, c.raddr}
50 host, strPort, _ := net.SplitHostPort(c.raddr)
51 port, _ := strconv.Atoi(strPort)
52 return &socks.ProxiedAddr{
59 // Close handles closing the connection.
60 func (c conn) Close() error {
64 return c.Closer.Close()
67 func (c conn) SetDeadline(t time.Time) error { return nil }
68 func (c conn) SetReadDeadline(t time.Time) error { return nil }
69 func (c conn) SetWriteDeadline(t time.Time) error { return nil }
71 // addr mocks a network address
76 func (m addr) Network() string { return m.net }
77 func (m addr) String() string { return m.address }
79 // pipe turns two mock connections into a full-duplex connection similar to
80 // net.Pipe to allow pipe's with (fake) addresses.
81 func pipe(c1, c2 *conn) (*conn, *conn) {
95 // peerStats holds the expected peer stats used for testing peer.
96 type peerStats struct {
98 wantServices wire.ServiceFlag
99 wantProtocolVersion uint32
101 wantVersionKnown bool
102 wantVerAckReceived bool
104 wantStartingHeight int32
105 wantLastPingTime time.Time
106 wantLastPingNonce uint64
107 wantLastPingMicros int64
110 wantBytesReceived uint64
111 wantWitnessEnabled bool
114 // testPeer tests the given peer's flags and stats
115 func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
116 if p.UserAgent() != s.wantUserAgent {
117 t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
121 if p.Services() != s.wantServices {
122 t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
126 if !p.LastPingTime().Equal(s.wantLastPingTime) {
127 t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
131 if p.LastPingNonce() != s.wantLastPingNonce {
132 t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
136 if p.LastPingMicros() != s.wantLastPingMicros {
137 t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
141 if p.VerAckReceived() != s.wantVerAckReceived {
142 t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
146 if p.VersionKnown() != s.wantVersionKnown {
147 t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
151 if p.ProtocolVersion() != s.wantProtocolVersion {
152 t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
156 if p.LastBlock() != s.wantLastBlock {
157 t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
161 // Allow for a deviation of 1s, as the second may tick when the message is
162 // in transit and the protocol doesn't support any further precision.
163 if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 {
164 t.Errorf("testPeer: wrong TimeOffset - got %v, want %v or %v", p.TimeOffset(),
165 s.wantTimeOffset, s.wantTimeOffset-1)
169 if p.BytesSent() != s.wantBytesSent {
170 t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
174 if p.BytesReceived() != s.wantBytesReceived {
175 t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
179 if p.StartingHeight() != s.wantStartingHeight {
180 t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
184 if p.Connected() != s.wantConnected {
185 t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
189 if p.IsWitnessEnabled() != s.wantWitnessEnabled {
190 t.Errorf("testPeer: wrong WitnessEnabled - got %v, want %v",
191 p.IsWitnessEnabled(), s.wantWitnessEnabled)
195 stats := p.StatsSnapshot()
197 if p.ID() != stats.ID {
198 t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
202 if p.Addr() != stats.Addr {
203 t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
207 if p.LastSend() != stats.LastSend {
208 t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
212 if p.LastRecv() != stats.LastRecv {
213 t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
218 // TestPeerConnection tests connection between inbound and outbound peers.
219 func TestPeerConnection(t *testing.T) {
220 verack := make(chan struct{})
221 peer1Cfg := &peer.Config{
222 Listeners: peer.MessageListeners{
223 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
226 OnWrite: func(p *peer.Peer, bytesWritten int, msg wire.Message,
228 if _, ok := msg.(*wire.MsgVerAck); ok {
233 UserAgentName: "peer",
234 UserAgentVersion: "1.0",
235 UserAgentComments: []string{"comment"},
236 ChainParams: &chaincfg.MainNetParams,
237 ProtocolVersion: wire.RejectVersion, // Configure with older version
240 peer2Cfg := &peer.Config{
241 Listeners: peer1Cfg.Listeners,
242 UserAgentName: "peer",
243 UserAgentVersion: "1.0",
244 UserAgentComments: []string{"comment"},
245 ChainParams: &chaincfg.MainNetParams,
246 Services: wire.SFNodeNetwork | wire.SFNodeWitness,
249 wantStats1 := peerStats{
250 wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
252 wantProtocolVersion: wire.RejectVersion,
254 wantVersionKnown: true,
255 wantVerAckReceived: true,
256 wantLastPingTime: time.Time{},
257 wantLastPingNonce: uint64(0),
258 wantLastPingMicros: int64(0),
259 wantTimeOffset: int64(0),
260 wantBytesSent: 167, // 143 version + 24 verack
261 wantBytesReceived: 167,
262 wantWitnessEnabled: false,
264 wantStats2 := peerStats{
265 wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
266 wantServices: wire.SFNodeNetwork | wire.SFNodeWitness,
267 wantProtocolVersion: wire.RejectVersion,
269 wantVersionKnown: true,
270 wantVerAckReceived: true,
271 wantLastPingTime: time.Time{},
272 wantLastPingNonce: uint64(0),
273 wantLastPingMicros: int64(0),
274 wantTimeOffset: int64(0),
275 wantBytesSent: 167, // 143 version + 24 verack
276 wantBytesReceived: 167,
277 wantWitnessEnabled: true,
282 setup func() (*peer.Peer, *peer.Peer, error)
286 func() (*peer.Peer, *peer.Peer, error) {
287 inConn, outConn := pipe(
288 &conn{raddr: "10.0.0.1:8333"},
289 &conn{raddr: "10.0.0.2:8333"},
291 inPeer := peer.NewInboundPeer(peer1Cfg)
292 inPeer.AssociateConnection(inConn)
294 outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
298 outPeer.AssociateConnection(outConn)
300 for i := 0; i < 4; i++ {
303 case <-time.After(time.Second):
304 return nil, nil, errors.New("verack timeout")
307 return inPeer, outPeer, nil
312 func() (*peer.Peer, *peer.Peer, error) {
313 inConn, outConn := pipe(
314 &conn{raddr: "10.0.0.1:8333", proxy: true},
315 &conn{raddr: "10.0.0.2:8333"},
317 inPeer := peer.NewInboundPeer(peer1Cfg)
318 inPeer.AssociateConnection(inConn)
320 outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
324 outPeer.AssociateConnection(outConn)
326 for i := 0; i < 4; i++ {
329 case <-time.After(time.Second):
330 return nil, nil, errors.New("verack timeout")
333 return inPeer, outPeer, nil
337 t.Logf("Running %d tests", len(tests))
338 for i, test := range tests {
339 inPeer, outPeer, err := test.setup()
341 t.Errorf("TestPeerConnection setup #%d: unexpected err %v", i, err)
344 testPeer(t, inPeer, wantStats2)
345 testPeer(t, outPeer, wantStats1)
349 inPeer.WaitForDisconnect()
350 outPeer.WaitForDisconnect()
354 // TestPeerListeners tests that the peer listeners are called as expected.
355 func TestPeerListeners(t *testing.T) {
356 verack := make(chan struct{}, 1)
357 ok := make(chan wire.Message, 20)
358 peerCfg := &peer.Config{
359 Listeners: peer.MessageListeners{
360 OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
363 OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) {
366 OnPing: func(p *peer.Peer, msg *wire.MsgPing) {
369 OnPong: func(p *peer.Peer, msg *wire.MsgPong) {
372 OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) {
375 OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) {
378 OnTx: func(p *peer.Peer, msg *wire.MsgTx) {
381 OnBlock: func(p *peer.Peer, msg *wire.MsgBlock, buf []byte) {
384 OnInv: func(p *peer.Peer, msg *wire.MsgInv) {
387 OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) {
390 OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) {
393 OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
396 OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) {
399 OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) {
402 OnFeeFilter: func(p *peer.Peer, msg *wire.MsgFeeFilter) {
405 OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) {
408 OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) {
411 OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) {
414 OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
417 OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
420 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
423 OnReject: func(p *peer.Peer, msg *wire.MsgReject) {
426 OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
430 UserAgentName: "peer",
431 UserAgentVersion: "1.0",
432 UserAgentComments: []string{"comment"},
433 ChainParams: &chaincfg.MainNetParams,
434 Services: wire.SFNodeBloom,
436 inConn, outConn := pipe(
437 &conn{raddr: "10.0.0.1:8333"},
438 &conn{raddr: "10.0.0.2:8333"},
440 inPeer := peer.NewInboundPeer(peerCfg)
441 inPeer.AssociateConnection(inConn)
443 peerCfg.Listeners = peer.MessageListeners{
444 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
448 outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
450 t.Errorf("NewOutboundPeer: unexpected err %v\n", err)
453 outPeer.AssociateConnection(outConn)
455 for i := 0; i < 2; i++ {
458 case <-time.After(time.Second * 1):
459 t.Errorf("TestPeerListeners: verack timeout\n")
470 wire.NewMsgGetAddr(),
486 wire.NewMsgAlert([]byte("payload"), []byte("signature")),
490 wire.NewMsgMemPool(),
494 wire.NewMsgTx(wire.TxVersion),
498 wire.NewMsgBlock(wire.NewBlockHeader(1,
499 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1)),
507 wire.NewMsgHeaders(),
511 wire.NewMsgNotFound(),
515 wire.NewMsgGetData(),
519 wire.NewMsgGetBlocks(&chainhash.Hash{}),
523 wire.NewMsgGetHeaders(),
527 wire.NewMsgFeeFilter(15000),
531 wire.NewMsgFilterAdd([]byte{0x01}),
535 wire.NewMsgFilterClear(),
539 wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
543 wire.NewMsgMerkleBlock(wire.NewBlockHeader(1,
544 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1)),
546 // only one version message is allowed
547 // only one verack message is allowed
550 wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
554 wire.NewMsgSendHeaders(),
557 t.Logf("Running %d tests", len(tests))
558 for _, test := range tests {
559 // Queue the test message
560 outPeer.QueueMessage(test.msg, nil)
563 case <-time.After(time.Second * 1):
564 t.Errorf("TestPeerListeners: %s timeout", test.listener)
572 // TestOutboundPeer tests that the outbound peer works as expected.
573 func TestOutboundPeer(t *testing.T) {
575 peerCfg := &peer.Config{
576 NewestBlock: func() (*chainhash.Hash, int32, error) {
577 return nil, 0, errors.New("newest block not found")
579 UserAgentName: "peer",
580 UserAgentVersion: "1.0",
581 UserAgentComments: []string{"comment"},
582 ChainParams: &chaincfg.MainNetParams,
587 c := &conn{raddr: "10.0.0.1:8333", Writer: w, Reader: r}
589 p, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
591 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
595 // Test trying to connect twice.
596 p.AssociateConnection(c)
597 p.AssociateConnection(c)
599 disconnected := make(chan struct{})
601 p.WaitForDisconnect()
602 disconnected <- struct{}{}
608 case <-time.After(time.Second):
609 t.Fatal("Peer did not automatically disconnect.")
613 t.Fatalf("Should not be connected as NewestBlock produces error.")
617 fakeBlockHash := &chainhash.Hash{0: 0x00, 1: 0x01}
618 fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
620 // Should be noops as the peer could not connect.
621 p.QueueInventory(fakeInv)
622 p.AddKnownInventory(fakeInv)
623 p.QueueInventory(fakeInv)
625 fakeMsg := wire.NewMsgVerAck()
626 p.QueueMessage(fakeMsg, nil)
627 done := make(chan struct{})
628 p.QueueMessage(fakeMsg, done)
633 var newestBlock = func() (*chainhash.Hash, int32, error) {
634 hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
635 hash, err := chainhash.NewHashFromStr(hashStr)
639 return hash, 234439, nil
642 peerCfg.NewestBlock = newestBlock
644 c1 := &conn{raddr: "10.0.0.1:8333", Writer: w1, Reader: r1}
645 p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
647 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
650 p1.AssociateConnection(c1)
652 // Test update latest block
653 latestBlockHash, err := chainhash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
655 t.Errorf("NewHashFromStr: unexpected err %v\n", err)
658 p1.UpdateLastAnnouncedBlock(latestBlockHash)
659 p1.UpdateLastBlockHeight(234440)
660 if p1.LastAnnouncedBlock() != latestBlockHash {
661 t.Errorf("LastAnnouncedBlock: wrong block - got %v, want %v",
662 p1.LastAnnouncedBlock(), latestBlockHash)
666 // Test Queue Inv after connection
667 p1.QueueInventory(fakeInv)
671 peerCfg.ChainParams = &chaincfg.RegressionNetParams
672 peerCfg.Services = wire.SFNodeBloom
674 c2 := &conn{raddr: "10.0.0.1:8333", Writer: w2, Reader: r2}
675 p2, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
677 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
680 p2.AssociateConnection(c2)
683 var addrs []*wire.NetAddress
684 for i := 0; i < 5; i++ {
685 na := wire.NetAddress{}
686 addrs = append(addrs, &na)
688 if _, err := p2.PushAddrMsg(addrs); err != nil {
689 t.Errorf("PushAddrMsg: unexpected err %v\n", err)
692 if err := p2.PushGetBlocksMsg(nil, &chainhash.Hash{}); err != nil {
693 t.Errorf("PushGetBlocksMsg: unexpected err %v\n", err)
696 if err := p2.PushGetHeadersMsg(nil, &chainhash.Hash{}); err != nil {
697 t.Errorf("PushGetHeadersMsg: unexpected err %v\n", err)
701 p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, false)
702 p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
704 // Test Queue Messages
705 p2.QueueMessage(wire.NewMsgGetAddr(), nil)
706 p2.QueueMessage(wire.NewMsgPing(1), nil)
707 p2.QueueMessage(wire.NewMsgMemPool(), nil)
708 p2.QueueMessage(wire.NewMsgGetData(), nil)
709 p2.QueueMessage(wire.NewMsgGetHeaders(), nil)
710 p2.QueueMessage(wire.NewMsgFeeFilter(20000), nil)
715 // Tests that the node disconnects from peers with an unsupported protocol
717 func TestUnsupportedVersionPeer(t *testing.T) {
718 peerCfg := &peer.Config{
719 UserAgentName: "peer",
720 UserAgentVersion: "1.0",
721 UserAgentComments: []string{"comment"},
722 ChainParams: &chaincfg.MainNetParams,
726 localNA := wire.NewNetAddressIPPort(
727 net.ParseIP("10.0.0.1"),
731 remoteNA := wire.NewNetAddressIPPort(
732 net.ParseIP("10.0.0.2"),
736 localConn, remoteConn := pipe(
737 &conn{laddr: "10.0.0.1:8333", raddr: "10.0.0.2:8333"},
738 &conn{laddr: "10.0.0.2:8333", raddr: "10.0.0.1:8333"},
741 p, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
743 t.Fatalf("NewOutboundPeer: unexpected err - %v\n", err)
745 p.AssociateConnection(localConn)
747 // Read outbound messages to peer into a channel
748 outboundMessages := make(chan wire.Message)
751 _, msg, _, err := wire.ReadMessageN(
754 peerCfg.ChainParams.Net,
757 close(outboundMessages)
761 t.Errorf("Error reading message from local node: %v\n", err)
765 outboundMessages <- msg
769 // Read version message sent to remote peer
771 case msg := <-outboundMessages:
772 if _, ok := msg.(*wire.MsgVersion); !ok {
773 t.Fatalf("Expected version message, got [%s]", msg.Command())
775 case <-time.After(time.Second):
776 t.Fatal("Peer did not send version message")
779 // Remote peer writes version message advertising invalid protocol version 1
780 invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0)
781 invalidVersionMsg.ProtocolVersion = 1
783 _, err = wire.WriteMessageN(
786 uint32(invalidVersionMsg.ProtocolVersion),
787 peerCfg.ChainParams.Net,
790 t.Fatalf("wire.WriteMessageN: unexpected err - %v\n", err)
793 // Expect peer to disconnect automatically
794 disconnected := make(chan struct{})
796 p.WaitForDisconnect()
797 disconnected <- struct{}{}
803 case <-time.After(time.Second):
804 t.Fatal("Peer did not automatically disconnect")
807 // Expect no further outbound messages from peer
809 case msg, chanOpen := <-outboundMessages:
811 t.Fatalf("Expected no further messages, received [%s]", msg.Command())
813 case <-time.After(time.Second):
814 t.Fatal("Timeout waiting for remote reader to close")
819 // Allow self connection when running the tests.
820 peer.TstAllowSelfConns()