OSDN Git Service

Merge pull request #783 from Bytom/p2p_opz
[bytom/bytom.git] / p2p / connection_test.go
1 // +build !network
2
3 package p2p_test
4
5 import (
6         "net"
7         "testing"
8         "time"
9
10         p2p "github.com/bytom/p2p"
11         "github.com/stretchr/testify/assert"
12         "github.com/stretchr/testify/require"
13         "github.com/tendermint/tmlibs/log"
14 )
15
16 func createMConnection(conn net.Conn) *p2p.MConnection {
17         onReceive := func(chID byte, msgBytes []byte) {
18         }
19         onError := func(r interface{}) {
20         }
21         c := createMConnectionWithCallbacks(conn, onReceive, onError)
22         c.SetLogger(log.TestingLogger())
23         return c
24 }
25
26 func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
27         chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
28         c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
29         c.SetLogger(log.TestingLogger())
30         return c
31 }
32
33 func TestMConnectionSend(t *testing.T) {
34         assert, require := assert.New(t), require.New(t)
35
36         server, client := net.Pipe()
37         defer server.Close()
38         defer client.Close()
39
40         mconn := createMConnection(client)
41         _, err := mconn.Start()
42         require.Nil(err)
43         defer mconn.Stop()
44
45         msg := "Ant-Man"
46         assert.True(mconn.Send(0x01, msg))
47         // Note: subsequent Send/TrySend calls could pass because we are reading from
48         // the send queue in a separate goroutine.
49         server.Read(make([]byte, len(msg)))
50         assert.True(mconn.CanSend(0x01))
51
52         msg = "Spider-Man"
53         assert.True(mconn.TrySend(0x01, msg))
54         server.Read(make([]byte, len(msg)))
55
56         assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
57         assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
58 }
59
60 func TestMConnectionReceive(t *testing.T) {
61         assert, require := assert.New(t), require.New(t)
62
63         server, client := net.Pipe()
64         defer server.Close()
65         defer client.Close()
66
67         receivedCh := make(chan []byte)
68         errorsCh := make(chan interface{})
69         onReceive := func(chID byte, msgBytes []byte) {
70                 receivedCh <- msgBytes
71         }
72         onError := func(r interface{}) {
73                 errorsCh <- r
74         }
75         mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
76         _, err := mconn1.Start()
77         require.Nil(err)
78         defer mconn1.Stop()
79
80         mconn2 := createMConnection(server)
81         _, err = mconn2.Start()
82         require.Nil(err)
83         defer mconn2.Stop()
84
85         msg := "Cyclops"
86         assert.True(mconn2.Send(0x01, msg))
87
88         select {
89         case receivedBytes := <-receivedCh:
90                 assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
91         case err := <-errorsCh:
92                 t.Fatalf("Expected %s, got %+v", msg, err)
93         case <-time.After(500 * time.Millisecond):
94                 t.Fatalf("Did not receive %s message in 500ms", msg)
95         }
96 }
97
98 func TestMConnectionStatus(t *testing.T) {
99         assert, require := assert.New(t), require.New(t)
100
101         server, client := net.Pipe()
102         defer server.Close()
103         defer client.Close()
104
105         mconn := createMConnection(client)
106         _, err := mconn.Start()
107         require.Nil(err)
108         defer mconn.Stop()
109
110         status := mconn.Status()
111         assert.NotNil(status)
112         assert.Zero(status.Channels[0].SendQueueSize)
113 }
114
115 func TestMConnectionStopsAndReturnsError(t *testing.T) {
116         assert, require := assert.New(t), require.New(t)
117
118         server, client := net.Pipe()
119         defer server.Close()
120         defer client.Close()
121
122         receivedCh := make(chan []byte)
123         errorsCh := make(chan interface{})
124         onReceive := func(chID byte, msgBytes []byte) {
125                 receivedCh <- msgBytes
126         }
127         onError := func(r interface{}) {
128                 errorsCh <- r
129         }
130         mconn := createMConnectionWithCallbacks(client, onReceive, onError)
131         _, err := mconn.Start()
132         require.Nil(err)
133         defer mconn.Stop()
134
135         client.Close()
136
137         select {
138         case receivedBytes := <-receivedCh:
139                 t.Fatalf("Expected error, got %v", receivedBytes)
140         case err := <-errorsCh:
141                 assert.NotNil(err)
142                 assert.False(mconn.IsRunning())
143         case <-time.After(500 * time.Millisecond):
144                 t.Fatal("Did not receive error in 500ms")
145         }
146 }