OSDN Git Service

merge with master
[bytom/bytom.git] / p2p / connection_test.go
1 package p2p_test
2
3 import (
4         "net"
5         "testing"
6         "time"
7
8         "github.com/stretchr/testify/assert"
9         "github.com/stretchr/testify/require"
10         p2p "github.com/bytom/p2p"
11         "github.com/tendermint/tmlibs/log"
12 )
13
14 func createMConnection(conn net.Conn) *p2p.MConnection {
15         onReceive := func(chID byte, msgBytes []byte) {
16         }
17         onError := func(r interface{}) {
18         }
19         c := createMConnectionWithCallbacks(conn, onReceive, onError)
20         c.SetLogger(log.TestingLogger())
21         return c
22 }
23
24 func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
25         chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
26         c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
27         c.SetLogger(log.TestingLogger())
28         return c
29 }
30
31 func TestMConnectionSend(t *testing.T) {
32         assert, require := assert.New(t), require.New(t)
33
34         server, client := net.Pipe()
35         defer server.Close()
36         defer client.Close()
37
38         mconn := createMConnection(client)
39         _, err := mconn.Start()
40         require.Nil(err)
41         defer mconn.Stop()
42
43         msg := "Ant-Man"
44         assert.True(mconn.Send(0x01, msg))
45         // Note: subsequent Send/TrySend calls could pass because we are reading from
46         // the send queue in a separate goroutine.
47         server.Read(make([]byte, len(msg)))
48         assert.True(mconn.CanSend(0x01))
49
50         msg = "Spider-Man"
51         assert.True(mconn.TrySend(0x01, msg))
52         server.Read(make([]byte, len(msg)))
53
54         assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
55         assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
56 }
57
58 func TestMConnectionReceive(t *testing.T) {
59         assert, require := assert.New(t), require.New(t)
60
61         server, client := net.Pipe()
62         defer server.Close()
63         defer client.Close()
64
65         receivedCh := make(chan []byte)
66         errorsCh := make(chan interface{})
67         onReceive := func(chID byte, msgBytes []byte) {
68                 receivedCh <- msgBytes
69         }
70         onError := func(r interface{}) {
71                 errorsCh <- r
72         }
73         mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
74         _, err := mconn1.Start()
75         require.Nil(err)
76         defer mconn1.Stop()
77
78         mconn2 := createMConnection(server)
79         _, err = mconn2.Start()
80         require.Nil(err)
81         defer mconn2.Stop()
82
83         msg := "Cyclops"
84         assert.True(mconn2.Send(0x01, msg))
85
86         select {
87         case receivedBytes := <-receivedCh:
88                 assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
89         case err := <-errorsCh:
90                 t.Fatalf("Expected %s, got %+v", msg, err)
91         case <-time.After(500 * time.Millisecond):
92                 t.Fatalf("Did not receive %s message in 500ms", msg)
93         }
94 }
95
96 func TestMConnectionStatus(t *testing.T) {
97         assert, require := assert.New(t), require.New(t)
98
99         server, client := net.Pipe()
100         defer server.Close()
101         defer client.Close()
102
103         mconn := createMConnection(client)
104         _, err := mconn.Start()
105         require.Nil(err)
106         defer mconn.Stop()
107
108         status := mconn.Status()
109         assert.NotNil(status)
110         assert.Zero(status.Channels[0].SendQueueSize)
111 }
112
113 func TestMConnectionStopsAndReturnsError(t *testing.T) {
114         assert, require := assert.New(t), require.New(t)
115
116         server, client := net.Pipe()
117         defer server.Close()
118         defer client.Close()
119
120         receivedCh := make(chan []byte)
121         errorsCh := make(chan interface{})
122         onReceive := func(chID byte, msgBytes []byte) {
123                 receivedCh <- msgBytes
124         }
125         onError := func(r interface{}) {
126                 errorsCh <- r
127         }
128         mconn := createMConnectionWithCallbacks(client, onReceive, onError)
129         _, err := mconn.Start()
130         require.Nil(err)
131         defer mconn.Stop()
132
133         client.Close()
134
135         select {
136         case receivedBytes := <-receivedCh:
137                 t.Fatalf("Expected error, got %v", receivedBytes)
138         case err := <-errorsCh:
139                 assert.NotNil(err)
140                 assert.False(mconn.IsRunning())
141         case <-time.After(500 * time.Millisecond):
142                 t.Fatal("Did not receive error in 500ms")
143         }
144 }