10 p2p "github.com/bytom/p2p"
11 "github.com/stretchr/testify/assert"
12 "github.com/stretchr/testify/require"
13 "github.com/tendermint/tmlibs/log"
16 func createMConnection(conn net.Conn) *p2p.MConnection {
17 onReceive := func(chID byte, msgBytes []byte) {
19 onError := func(r interface{}) {
21 c := createMConnectionWithCallbacks(conn, onReceive, onError)
22 c.SetLogger(log.TestingLogger())
26 func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
27 chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
28 c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
29 c.SetLogger(log.TestingLogger())
33 func TestMConnectionSend(t *testing.T) {
34 assert, require := assert.New(t), require.New(t)
36 server, client := net.Pipe()
40 mconn := createMConnection(client)
41 _, err := mconn.Start()
46 assert.True(mconn.Send(0x01, msg))
47 // Note: subsequent Send/TrySend calls could pass because we are reading from
48 // the send queue in a separate goroutine.
49 server.Read(make([]byte, len(msg)))
50 assert.True(mconn.CanSend(0x01))
53 assert.True(mconn.TrySend(0x01, msg))
54 server.Read(make([]byte, len(msg)))
56 assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
57 assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
60 func TestMConnectionReceive(t *testing.T) {
61 assert, require := assert.New(t), require.New(t)
63 server, client := net.Pipe()
67 receivedCh := make(chan []byte)
68 errorsCh := make(chan interface{})
69 onReceive := func(chID byte, msgBytes []byte) {
70 receivedCh <- msgBytes
72 onError := func(r interface{}) {
75 mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
76 _, err := mconn1.Start()
80 mconn2 := createMConnection(server)
81 _, err = mconn2.Start()
86 assert.True(mconn2.Send(0x01, msg))
89 case receivedBytes := <-receivedCh:
90 assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
91 case err := <-errorsCh:
92 t.Fatalf("Expected %s, got %+v", msg, err)
93 case <-time.After(500 * time.Millisecond):
94 t.Fatalf("Did not receive %s message in 500ms", msg)
98 func TestMConnectionStatus(t *testing.T) {
99 assert, require := assert.New(t), require.New(t)
101 server, client := net.Pipe()
105 mconn := createMConnection(client)
106 _, err := mconn.Start()
110 status := mconn.Status()
111 assert.NotNil(status)
112 assert.Zero(status.Channels[0].SendQueueSize)
115 func TestMConnectionStopsAndReturnsError(t *testing.T) {
116 assert, require := assert.New(t), require.New(t)
118 server, client := net.Pipe()
122 receivedCh := make(chan []byte)
123 errorsCh := make(chan interface{})
124 onReceive := func(chID byte, msgBytes []byte) {
125 receivedCh <- msgBytes
127 onError := func(r interface{}) {
130 mconn := createMConnectionWithCallbacks(client, onReceive, onError)
131 _, err := mconn.Start()
138 case receivedBytes := <-receivedCh:
139 t.Fatalf("Expected error, got %v", receivedBytes)
140 case err := <-errorsCh:
142 assert.False(mconn.IsRunning())
143 case <-time.After(500 * time.Millisecond):
144 t.Fatal("Did not receive error in 500ms")