8 "github.com/stretchr/testify/assert"
9 "github.com/stretchr/testify/require"
10 "github.com/tendermint/tmlibs/log"
13 func createMConnection(conn net.Conn) *MConnection {
14 onReceive := func(chID byte, msgBytes []byte) {
16 onError := func(r interface{}) {
18 c := createMConnectionWithCallbacks(conn, onReceive, onError)
19 c.SetLogger(log.TestingLogger())
23 func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
24 chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
25 c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, DefaultMConnConfig())
26 c.SetLogger(log.TestingLogger())
30 func TestMConnectionSend(t *testing.T) {
31 assert, require := assert.New(t), require.New(t)
33 server, client := net.Pipe()
37 mconn := createMConnection(client)
38 _, err := mconn.Start()
43 assert.True(mconn.Send(0x01, msg))
44 // Note: subsequent Send/TrySend calls could pass because we are reading from
45 // the send queue in a separate goroutine.
46 server.Read(make([]byte, len(msg)))
47 assert.True(mconn.CanSend(0x01))
50 assert.True(mconn.TrySend(0x01, msg))
51 server.Read(make([]byte, len(msg)))
53 assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
54 assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
57 func TestMConnectionReceive(t *testing.T) {
58 assert, require := assert.New(t), require.New(t)
60 server, client := net.Pipe()
64 receivedCh := make(chan []byte)
65 errorsCh := make(chan interface{})
66 onReceive := func(chID byte, msgBytes []byte) {
67 receivedCh <- msgBytes
69 onError := func(r interface{}) {
72 mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
73 _, err := mconn1.Start()
77 mconn2 := createMConnection(server)
78 _, err = mconn2.Start()
83 assert.True(mconn2.Send(0x01, msg))
86 case receivedBytes := <-receivedCh:
87 assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
88 case err := <-errorsCh:
89 t.Fatalf("Expected %s, got %+v", msg, err)
90 case <-time.After(500 * time.Millisecond):
91 t.Fatalf("Did not receive %s message in 500ms", msg)
95 func TestMConnectionStopsAndReturnsError(t *testing.T) {
96 assert, require := assert.New(t), require.New(t)
98 server, client := net.Pipe()
102 receivedCh := make(chan []byte)
103 errorsCh := make(chan interface{})
104 onReceive := func(chID byte, msgBytes []byte) {
105 receivedCh <- msgBytes
107 onError := func(r interface{}) {
110 mconn := createMConnectionWithCallbacks(client, onReceive, onError)
111 _, err := mconn.Start()
118 case receivedBytes := <-receivedCh:
119 t.Fatalf("Expected error, got %v", receivedBytes)
120 case err := <-errorsCh:
122 assert.False(mconn.IsRunning())
123 case <-time.After(500 * time.Millisecond):
124 t.Fatal("Did not receive error in 500ms")