OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / peer / peer_test.go
1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package peer_test
6
7 import (
8         "errors"
9         "io"
10         "net"
11         "strconv"
12         "testing"
13         "time"
14
15         "github.com/btcsuite/btcd/chaincfg"
16         "github.com/btcsuite/btcd/chaincfg/chainhash"
17         "github.com/btcsuite/btcd/peer"
18         "github.com/btcsuite/btcd/wire"
19         "github.com/btcsuite/go-socks/socks"
20 )
21
22 // conn mocks a network connection by implementing the net.Conn interface.  It
23 // is used to test peer connection without actually opening a network
24 // connection.
25 type conn struct {
26         io.Reader
27         io.Writer
28         io.Closer
29
30         // local network, address for the connection.
31         lnet, laddr string
32
33         // remote network, address for the connection.
34         rnet, raddr string
35
36         // mocks socks proxy if true
37         proxy bool
38 }
39
40 // LocalAddr returns the local address for the connection.
41 func (c conn) LocalAddr() net.Addr {
42         return &addr{c.lnet, c.laddr}
43 }
44
45 // Remote returns the remote address for the connection.
46 func (c conn) RemoteAddr() net.Addr {
47         if !c.proxy {
48                 return &addr{c.rnet, c.raddr}
49         }
50         host, strPort, _ := net.SplitHostPort(c.raddr)
51         port, _ := strconv.Atoi(strPort)
52         return &socks.ProxiedAddr{
53                 Net:  c.rnet,
54                 Host: host,
55                 Port: port,
56         }
57 }
58
59 // Close handles closing the connection.
60 func (c conn) Close() error {
61         if c.Closer == nil {
62                 return nil
63         }
64         return c.Closer.Close()
65 }
66
67 func (c conn) SetDeadline(t time.Time) error      { return nil }
68 func (c conn) SetReadDeadline(t time.Time) error  { return nil }
69 func (c conn) SetWriteDeadline(t time.Time) error { return nil }
70
71 // addr mocks a network address
72 type addr struct {
73         net, address string
74 }
75
76 func (m addr) Network() string { return m.net }
77 func (m addr) String() string  { return m.address }
78
79 // pipe turns two mock connections into a full-duplex connection similar to
80 // net.Pipe to allow pipe's with (fake) addresses.
81 func pipe(c1, c2 *conn) (*conn, *conn) {
82         r1, w1 := io.Pipe()
83         r2, w2 := io.Pipe()
84
85         c1.Writer = w1
86         c1.Closer = w1
87         c2.Reader = r1
88         c1.Reader = r2
89         c2.Writer = w2
90         c2.Closer = w2
91
92         return c1, c2
93 }
94
95 // peerStats holds the expected peer stats used for testing peer.
96 type peerStats struct {
97         wantUserAgent       string
98         wantServices        wire.ServiceFlag
99         wantProtocolVersion uint32
100         wantConnected       bool
101         wantVersionKnown    bool
102         wantVerAckReceived  bool
103         wantLastBlock       int32
104         wantStartingHeight  int32
105         wantLastPingTime    time.Time
106         wantLastPingNonce   uint64
107         wantLastPingMicros  int64
108         wantTimeOffset      int64
109         wantBytesSent       uint64
110         wantBytesReceived   uint64
111         wantWitnessEnabled  bool
112 }
113
114 // testPeer tests the given peer's flags and stats
115 func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
116         if p.UserAgent() != s.wantUserAgent {
117                 t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
118                 return
119         }
120
121         if p.Services() != s.wantServices {
122                 t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
123                 return
124         }
125
126         if !p.LastPingTime().Equal(s.wantLastPingTime) {
127                 t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
128                 return
129         }
130
131         if p.LastPingNonce() != s.wantLastPingNonce {
132                 t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
133                 return
134         }
135
136         if p.LastPingMicros() != s.wantLastPingMicros {
137                 t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
138                 return
139         }
140
141         if p.VerAckReceived() != s.wantVerAckReceived {
142                 t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
143                 return
144         }
145
146         if p.VersionKnown() != s.wantVersionKnown {
147                 t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
148                 return
149         }
150
151         if p.ProtocolVersion() != s.wantProtocolVersion {
152                 t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
153                 return
154         }
155
156         if p.LastBlock() != s.wantLastBlock {
157                 t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
158                 return
159         }
160
161         // Allow for a deviation of 1s, as the second may tick when the message is
162         // in transit and the protocol doesn't support any further precision.
163         if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 {
164                 t.Errorf("testPeer: wrong TimeOffset - got %v, want %v or %v", p.TimeOffset(),
165                         s.wantTimeOffset, s.wantTimeOffset-1)
166                 return
167         }
168
169         if p.BytesSent() != s.wantBytesSent {
170                 t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
171                 return
172         }
173
174         if p.BytesReceived() != s.wantBytesReceived {
175                 t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
176                 return
177         }
178
179         if p.StartingHeight() != s.wantStartingHeight {
180                 t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
181                 return
182         }
183
184         if p.Connected() != s.wantConnected {
185                 t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
186                 return
187         }
188
189         if p.IsWitnessEnabled() != s.wantWitnessEnabled {
190                 t.Errorf("testPeer: wrong WitnessEnabled - got %v, want %v",
191                         p.IsWitnessEnabled(), s.wantWitnessEnabled)
192                 return
193         }
194
195         stats := p.StatsSnapshot()
196
197         if p.ID() != stats.ID {
198                 t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
199                 return
200         }
201
202         if p.Addr() != stats.Addr {
203                 t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
204                 return
205         }
206
207         if p.LastSend() != stats.LastSend {
208                 t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
209                 return
210         }
211
212         if p.LastRecv() != stats.LastRecv {
213                 t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
214                 return
215         }
216 }
217
218 // TestPeerConnection tests connection between inbound and outbound peers.
219 func TestPeerConnection(t *testing.T) {
220         verack := make(chan struct{})
221         peer1Cfg := &peer.Config{
222                 Listeners: peer.MessageListeners{
223                         OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
224                                 verack <- struct{}{}
225                         },
226                         OnWrite: func(p *peer.Peer, bytesWritten int, msg wire.Message,
227                                 err error) {
228                                 if _, ok := msg.(*wire.MsgVerAck); ok {
229                                         verack <- struct{}{}
230                                 }
231                         },
232                 },
233                 UserAgentName:     "peer",
234                 UserAgentVersion:  "1.0",
235                 UserAgentComments: []string{"comment"},
236                 ChainParams:       &chaincfg.MainNetParams,
237                 ProtocolVersion:   wire.RejectVersion, // Configure with older version
238                 Services:          0,
239         }
240         peer2Cfg := &peer.Config{
241                 Listeners:         peer1Cfg.Listeners,
242                 UserAgentName:     "peer",
243                 UserAgentVersion:  "1.0",
244                 UserAgentComments: []string{"comment"},
245                 ChainParams:       &chaincfg.MainNetParams,
246                 Services:          wire.SFNodeNetwork | wire.SFNodeWitness,
247         }
248
249         wantStats1 := peerStats{
250                 wantUserAgent:       wire.DefaultUserAgent + "peer:1.0(comment)/",
251                 wantServices:        0,
252                 wantProtocolVersion: wire.RejectVersion,
253                 wantConnected:       true,
254                 wantVersionKnown:    true,
255                 wantVerAckReceived:  true,
256                 wantLastPingTime:    time.Time{},
257                 wantLastPingNonce:   uint64(0),
258                 wantLastPingMicros:  int64(0),
259                 wantTimeOffset:      int64(0),
260                 wantBytesSent:       167, // 143 version + 24 verack
261                 wantBytesReceived:   167,
262                 wantWitnessEnabled:  false,
263         }
264         wantStats2 := peerStats{
265                 wantUserAgent:       wire.DefaultUserAgent + "peer:1.0(comment)/",
266                 wantServices:        wire.SFNodeNetwork | wire.SFNodeWitness,
267                 wantProtocolVersion: wire.RejectVersion,
268                 wantConnected:       true,
269                 wantVersionKnown:    true,
270                 wantVerAckReceived:  true,
271                 wantLastPingTime:    time.Time{},
272                 wantLastPingNonce:   uint64(0),
273                 wantLastPingMicros:  int64(0),
274                 wantTimeOffset:      int64(0),
275                 wantBytesSent:       167, // 143 version + 24 verack
276                 wantBytesReceived:   167,
277                 wantWitnessEnabled:  true,
278         }
279
280         tests := []struct {
281                 name  string
282                 setup func() (*peer.Peer, *peer.Peer, error)
283         }{
284                 {
285                         "basic handshake",
286                         func() (*peer.Peer, *peer.Peer, error) {
287                                 inConn, outConn := pipe(
288                                         &conn{raddr: "10.0.0.1:8333"},
289                                         &conn{raddr: "10.0.0.2:8333"},
290                                 )
291                                 inPeer := peer.NewInboundPeer(peer1Cfg)
292                                 inPeer.AssociateConnection(inConn)
293
294                                 outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
295                                 if err != nil {
296                                         return nil, nil, err
297                                 }
298                                 outPeer.AssociateConnection(outConn)
299
300                                 for i := 0; i < 4; i++ {
301                                         select {
302                                         case <-verack:
303                                         case <-time.After(time.Second):
304                                                 return nil, nil, errors.New("verack timeout")
305                                         }
306                                 }
307                                 return inPeer, outPeer, nil
308                         },
309                 },
310                 {
311                         "socks proxy",
312                         func() (*peer.Peer, *peer.Peer, error) {
313                                 inConn, outConn := pipe(
314                                         &conn{raddr: "10.0.0.1:8333", proxy: true},
315                                         &conn{raddr: "10.0.0.2:8333"},
316                                 )
317                                 inPeer := peer.NewInboundPeer(peer1Cfg)
318                                 inPeer.AssociateConnection(inConn)
319
320                                 outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
321                                 if err != nil {
322                                         return nil, nil, err
323                                 }
324                                 outPeer.AssociateConnection(outConn)
325
326                                 for i := 0; i < 4; i++ {
327                                         select {
328                                         case <-verack:
329                                         case <-time.After(time.Second):
330                                                 return nil, nil, errors.New("verack timeout")
331                                         }
332                                 }
333                                 return inPeer, outPeer, nil
334                         },
335                 },
336         }
337         t.Logf("Running %d tests", len(tests))
338         for i, test := range tests {
339                 inPeer, outPeer, err := test.setup()
340                 if err != nil {
341                         t.Errorf("TestPeerConnection setup #%d: unexpected err %v", i, err)
342                         return
343                 }
344                 testPeer(t, inPeer, wantStats2)
345                 testPeer(t, outPeer, wantStats1)
346
347                 inPeer.Disconnect()
348                 outPeer.Disconnect()
349                 inPeer.WaitForDisconnect()
350                 outPeer.WaitForDisconnect()
351         }
352 }
353
354 // TestPeerListeners tests that the peer listeners are called as expected.
355 func TestPeerListeners(t *testing.T) {
356         verack := make(chan struct{}, 1)
357         ok := make(chan wire.Message, 20)
358         peerCfg := &peer.Config{
359                 Listeners: peer.MessageListeners{
360                         OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
361                                 ok <- msg
362                         },
363                         OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) {
364                                 ok <- msg
365                         },
366                         OnPing: func(p *peer.Peer, msg *wire.MsgPing) {
367                                 ok <- msg
368                         },
369                         OnPong: func(p *peer.Peer, msg *wire.MsgPong) {
370                                 ok <- msg
371                         },
372                         OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) {
373                                 ok <- msg
374                         },
375                         OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) {
376                                 ok <- msg
377                         },
378                         OnTx: func(p *peer.Peer, msg *wire.MsgTx) {
379                                 ok <- msg
380                         },
381                         OnBlock: func(p *peer.Peer, msg *wire.MsgBlock, buf []byte) {
382                                 ok <- msg
383                         },
384                         OnInv: func(p *peer.Peer, msg *wire.MsgInv) {
385                                 ok <- msg
386                         },
387                         OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) {
388                                 ok <- msg
389                         },
390                         OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) {
391                                 ok <- msg
392                         },
393                         OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
394                                 ok <- msg
395                         },
396                         OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) {
397                                 ok <- msg
398                         },
399                         OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) {
400                                 ok <- msg
401                         },
402                         OnFeeFilter: func(p *peer.Peer, msg *wire.MsgFeeFilter) {
403                                 ok <- msg
404                         },
405                         OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) {
406                                 ok <- msg
407                         },
408                         OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) {
409                                 ok <- msg
410                         },
411                         OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) {
412                                 ok <- msg
413                         },
414                         OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
415                                 ok <- msg
416                         },
417                         OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
418                                 ok <- msg
419                         },
420                         OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
421                                 verack <- struct{}{}
422                         },
423                         OnReject: func(p *peer.Peer, msg *wire.MsgReject) {
424                                 ok <- msg
425                         },
426                         OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
427                                 ok <- msg
428                         },
429                 },
430                 UserAgentName:     "peer",
431                 UserAgentVersion:  "1.0",
432                 UserAgentComments: []string{"comment"},
433                 ChainParams:       &chaincfg.MainNetParams,
434                 Services:          wire.SFNodeBloom,
435         }
436         inConn, outConn := pipe(
437                 &conn{raddr: "10.0.0.1:8333"},
438                 &conn{raddr: "10.0.0.2:8333"},
439         )
440         inPeer := peer.NewInboundPeer(peerCfg)
441         inPeer.AssociateConnection(inConn)
442
443         peerCfg.Listeners = peer.MessageListeners{
444                 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
445                         verack <- struct{}{}
446                 },
447         }
448         outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
449         if err != nil {
450                 t.Errorf("NewOutboundPeer: unexpected err %v\n", err)
451                 return
452         }
453         outPeer.AssociateConnection(outConn)
454
455         for i := 0; i < 2; i++ {
456                 select {
457                 case <-verack:
458                 case <-time.After(time.Second * 1):
459                         t.Errorf("TestPeerListeners: verack timeout\n")
460                         return
461                 }
462         }
463
464         tests := []struct {
465                 listener string
466                 msg      wire.Message
467         }{
468                 {
469                         "OnGetAddr",
470                         wire.NewMsgGetAddr(),
471                 },
472                 {
473                         "OnAddr",
474                         wire.NewMsgAddr(),
475                 },
476                 {
477                         "OnPing",
478                         wire.NewMsgPing(42),
479                 },
480                 {
481                         "OnPong",
482                         wire.NewMsgPong(42),
483                 },
484                 {
485                         "OnAlert",
486                         wire.NewMsgAlert([]byte("payload"), []byte("signature")),
487                 },
488                 {
489                         "OnMemPool",
490                         wire.NewMsgMemPool(),
491                 },
492                 {
493                         "OnTx",
494                         wire.NewMsgTx(wire.TxVersion),
495                 },
496                 {
497                         "OnBlock",
498                         wire.NewMsgBlock(wire.NewBlockHeader(1,
499                                 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1)),
500                 },
501                 {
502                         "OnInv",
503                         wire.NewMsgInv(),
504                 },
505                 {
506                         "OnHeaders",
507                         wire.NewMsgHeaders(),
508                 },
509                 {
510                         "OnNotFound",
511                         wire.NewMsgNotFound(),
512                 },
513                 {
514                         "OnGetData",
515                         wire.NewMsgGetData(),
516                 },
517                 {
518                         "OnGetBlocks",
519                         wire.NewMsgGetBlocks(&chainhash.Hash{}),
520                 },
521                 {
522                         "OnGetHeaders",
523                         wire.NewMsgGetHeaders(),
524                 },
525                 {
526                         "OnFeeFilter",
527                         wire.NewMsgFeeFilter(15000),
528                 },
529                 {
530                         "OnFilterAdd",
531                         wire.NewMsgFilterAdd([]byte{0x01}),
532                 },
533                 {
534                         "OnFilterClear",
535                         wire.NewMsgFilterClear(),
536                 },
537                 {
538                         "OnFilterLoad",
539                         wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
540                 },
541                 {
542                         "OnMerkleBlock",
543                         wire.NewMsgMerkleBlock(wire.NewBlockHeader(1,
544                                 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1)),
545                 },
546                 // only one version message is allowed
547                 // only one verack message is allowed
548                 {
549                         "OnReject",
550                         wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
551                 },
552                 {
553                         "OnSendHeaders",
554                         wire.NewMsgSendHeaders(),
555                 },
556         }
557         t.Logf("Running %d tests", len(tests))
558         for _, test := range tests {
559                 // Queue the test message
560                 outPeer.QueueMessage(test.msg, nil)
561                 select {
562                 case <-ok:
563                 case <-time.After(time.Second * 1):
564                         t.Errorf("TestPeerListeners: %s timeout", test.listener)
565                         return
566                 }
567         }
568         inPeer.Disconnect()
569         outPeer.Disconnect()
570 }
571
572 // TestOutboundPeer tests that the outbound peer works as expected.
573 func TestOutboundPeer(t *testing.T) {
574
575         peerCfg := &peer.Config{
576                 NewestBlock: func() (*chainhash.Hash, int32, error) {
577                         return nil, 0, errors.New("newest block not found")
578                 },
579                 UserAgentName:     "peer",
580                 UserAgentVersion:  "1.0",
581                 UserAgentComments: []string{"comment"},
582                 ChainParams:       &chaincfg.MainNetParams,
583                 Services:          0,
584         }
585
586         r, w := io.Pipe()
587         c := &conn{raddr: "10.0.0.1:8333", Writer: w, Reader: r}
588
589         p, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
590         if err != nil {
591                 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
592                 return
593         }
594
595         // Test trying to connect twice.
596         p.AssociateConnection(c)
597         p.AssociateConnection(c)
598
599         disconnected := make(chan struct{})
600         go func() {
601                 p.WaitForDisconnect()
602                 disconnected <- struct{}{}
603         }()
604
605         select {
606         case <-disconnected:
607                 close(disconnected)
608         case <-time.After(time.Second):
609                 t.Fatal("Peer did not automatically disconnect.")
610         }
611
612         if p.Connected() {
613                 t.Fatalf("Should not be connected as NewestBlock produces error.")
614         }
615
616         // Test Queue Inv
617         fakeBlockHash := &chainhash.Hash{0: 0x00, 1: 0x01}
618         fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
619
620         // Should be noops as the peer could not connect.
621         p.QueueInventory(fakeInv)
622         p.AddKnownInventory(fakeInv)
623         p.QueueInventory(fakeInv)
624
625         fakeMsg := wire.NewMsgVerAck()
626         p.QueueMessage(fakeMsg, nil)
627         done := make(chan struct{})
628         p.QueueMessage(fakeMsg, done)
629         <-done
630         p.Disconnect()
631
632         // Test NewestBlock
633         var newestBlock = func() (*chainhash.Hash, int32, error) {
634                 hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
635                 hash, err := chainhash.NewHashFromStr(hashStr)
636                 if err != nil {
637                         return nil, 0, err
638                 }
639                 return hash, 234439, nil
640         }
641
642         peerCfg.NewestBlock = newestBlock
643         r1, w1 := io.Pipe()
644         c1 := &conn{raddr: "10.0.0.1:8333", Writer: w1, Reader: r1}
645         p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
646         if err != nil {
647                 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
648                 return
649         }
650         p1.AssociateConnection(c1)
651
652         // Test update latest block
653         latestBlockHash, err := chainhash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
654         if err != nil {
655                 t.Errorf("NewHashFromStr: unexpected err %v\n", err)
656                 return
657         }
658         p1.UpdateLastAnnouncedBlock(latestBlockHash)
659         p1.UpdateLastBlockHeight(234440)
660         if p1.LastAnnouncedBlock() != latestBlockHash {
661                 t.Errorf("LastAnnouncedBlock: wrong block - got %v, want %v",
662                         p1.LastAnnouncedBlock(), latestBlockHash)
663                 return
664         }
665
666         // Test Queue Inv after connection
667         p1.QueueInventory(fakeInv)
668         p1.Disconnect()
669
670         // Test regression
671         peerCfg.ChainParams = &chaincfg.RegressionNetParams
672         peerCfg.Services = wire.SFNodeBloom
673         r2, w2 := io.Pipe()
674         c2 := &conn{raddr: "10.0.0.1:8333", Writer: w2, Reader: r2}
675         p2, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
676         if err != nil {
677                 t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
678                 return
679         }
680         p2.AssociateConnection(c2)
681
682         // Test PushXXX
683         var addrs []*wire.NetAddress
684         for i := 0; i < 5; i++ {
685                 na := wire.NetAddress{}
686                 addrs = append(addrs, &na)
687         }
688         if _, err := p2.PushAddrMsg(addrs); err != nil {
689                 t.Errorf("PushAddrMsg: unexpected err %v\n", err)
690                 return
691         }
692         if err := p2.PushGetBlocksMsg(nil, &chainhash.Hash{}); err != nil {
693                 t.Errorf("PushGetBlocksMsg: unexpected err %v\n", err)
694                 return
695         }
696         if err := p2.PushGetHeadersMsg(nil, &chainhash.Hash{}); err != nil {
697                 t.Errorf("PushGetHeadersMsg: unexpected err %v\n", err)
698                 return
699         }
700
701         p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, false)
702         p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
703
704         // Test Queue Messages
705         p2.QueueMessage(wire.NewMsgGetAddr(), nil)
706         p2.QueueMessage(wire.NewMsgPing(1), nil)
707         p2.QueueMessage(wire.NewMsgMemPool(), nil)
708         p2.QueueMessage(wire.NewMsgGetData(), nil)
709         p2.QueueMessage(wire.NewMsgGetHeaders(), nil)
710         p2.QueueMessage(wire.NewMsgFeeFilter(20000), nil)
711
712         p2.Disconnect()
713 }
714
715 // Tests that the node disconnects from peers with an unsupported protocol
716 // version.
717 func TestUnsupportedVersionPeer(t *testing.T) {
718         peerCfg := &peer.Config{
719                 UserAgentName:     "peer",
720                 UserAgentVersion:  "1.0",
721                 UserAgentComments: []string{"comment"},
722                 ChainParams:       &chaincfg.MainNetParams,
723                 Services:          0,
724         }
725
726         localNA := wire.NewNetAddressIPPort(
727                 net.ParseIP("10.0.0.1"),
728                 uint16(8333),
729                 wire.SFNodeNetwork,
730         )
731         remoteNA := wire.NewNetAddressIPPort(
732                 net.ParseIP("10.0.0.2"),
733                 uint16(8333),
734                 wire.SFNodeNetwork,
735         )
736         localConn, remoteConn := pipe(
737                 &conn{laddr: "10.0.0.1:8333", raddr: "10.0.0.2:8333"},
738                 &conn{laddr: "10.0.0.2:8333", raddr: "10.0.0.1:8333"},
739         )
740
741         p, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
742         if err != nil {
743                 t.Fatalf("NewOutboundPeer: unexpected err - %v\n", err)
744         }
745         p.AssociateConnection(localConn)
746
747         // Read outbound messages to peer into a channel
748         outboundMessages := make(chan wire.Message)
749         go func() {
750                 for {
751                         _, msg, _, err := wire.ReadMessageN(
752                                 remoteConn,
753                                 p.ProtocolVersion(),
754                                 peerCfg.ChainParams.Net,
755                         )
756                         if err == io.EOF {
757                                 close(outboundMessages)
758                                 return
759                         }
760                         if err != nil {
761                                 t.Errorf("Error reading message from local node: %v\n", err)
762                                 return
763                         }
764
765                         outboundMessages <- msg
766                 }
767         }()
768
769         // Read version message sent to remote peer
770         select {
771         case msg := <-outboundMessages:
772                 if _, ok := msg.(*wire.MsgVersion); !ok {
773                         t.Fatalf("Expected version message, got [%s]", msg.Command())
774                 }
775         case <-time.After(time.Second):
776                 t.Fatal("Peer did not send version message")
777         }
778
779         // Remote peer writes version message advertising invalid protocol version 1
780         invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0)
781         invalidVersionMsg.ProtocolVersion = 1
782
783         _, err = wire.WriteMessageN(
784                 remoteConn.Writer,
785                 invalidVersionMsg,
786                 uint32(invalidVersionMsg.ProtocolVersion),
787                 peerCfg.ChainParams.Net,
788         )
789         if err != nil {
790                 t.Fatalf("wire.WriteMessageN: unexpected err - %v\n", err)
791         }
792
793         // Expect peer to disconnect automatically
794         disconnected := make(chan struct{})
795         go func() {
796                 p.WaitForDisconnect()
797                 disconnected <- struct{}{}
798         }()
799
800         select {
801         case <-disconnected:
802                 close(disconnected)
803         case <-time.After(time.Second):
804                 t.Fatal("Peer did not automatically disconnect")
805         }
806
807         // Expect no further outbound messages from peer
808         select {
809         case msg, chanOpen := <-outboundMessages:
810                 if chanOpen {
811                         t.Fatalf("Expected no further messages, received [%s]", msg.Command())
812                 }
813         case <-time.After(time.Second):
814                 t.Fatal("Timeout waiting for remote reader to close")
815         }
816 }
817
818 func init() {
819         // Allow self connection when running the tests.
820         peer.TstAllowSelfConns()
821 }