8 "github.com/stretchr/testify/assert"
9 "github.com/stretchr/testify/require"
10 p2p "github.com/bytom/p2p"
11 "github.com/tendermint/tmlibs/log"
14 func createMConnection(conn net.Conn) *p2p.MConnection {
15 onReceive := func(chID byte, msgBytes []byte) {
17 onError := func(r interface{}) {
19 c := createMConnectionWithCallbacks(conn, onReceive, onError)
20 c.SetLogger(log.TestingLogger())
24 func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
25 chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
26 c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
27 c.SetLogger(log.TestingLogger())
31 func TestMConnectionSend(t *testing.T) {
32 assert, require := assert.New(t), require.New(t)
34 server, client := net.Pipe()
38 mconn := createMConnection(client)
39 _, err := mconn.Start()
44 assert.True(mconn.Send(0x01, msg))
45 // Note: subsequent Send/TrySend calls could pass because we are reading from
46 // the send queue in a separate goroutine.
47 server.Read(make([]byte, len(msg)))
48 assert.True(mconn.CanSend(0x01))
51 assert.True(mconn.TrySend(0x01, msg))
52 server.Read(make([]byte, len(msg)))
54 assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
55 assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
58 func TestMConnectionReceive(t *testing.T) {
59 assert, require := assert.New(t), require.New(t)
61 server, client := net.Pipe()
65 receivedCh := make(chan []byte)
66 errorsCh := make(chan interface{})
67 onReceive := func(chID byte, msgBytes []byte) {
68 receivedCh <- msgBytes
70 onError := func(r interface{}) {
73 mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
74 _, err := mconn1.Start()
78 mconn2 := createMConnection(server)
79 _, err = mconn2.Start()
84 assert.True(mconn2.Send(0x01, msg))
87 case receivedBytes := <-receivedCh:
88 assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
89 case err := <-errorsCh:
90 t.Fatalf("Expected %s, got %+v", msg, err)
91 case <-time.After(500 * time.Millisecond):
92 t.Fatalf("Did not receive %s message in 500ms", msg)
96 func TestMConnectionStatus(t *testing.T) {
97 assert, require := assert.New(t), require.New(t)
99 server, client := net.Pipe()
103 mconn := createMConnection(client)
104 _, err := mconn.Start()
108 status := mconn.Status()
109 assert.NotNil(status)
110 assert.Zero(status.Channels[0].SendQueueSize)
113 func TestMConnectionStopsAndReturnsError(t *testing.T) {
114 assert, require := assert.New(t), require.New(t)
116 server, client := net.Pipe()
120 receivedCh := make(chan []byte)
121 errorsCh := make(chan interface{})
122 onReceive := func(chID byte, msgBytes []byte) {
123 receivedCh <- msgBytes
125 onError := func(r interface{}) {
128 mconn := createMConnectionWithCallbacks(client, onReceive, onError)
129 _, err := mconn.Start()
136 case receivedBytes := <-receivedCh:
137 t.Fatalf("Expected error, got %v", receivedBytes)
138 case err := <-errorsCh:
140 assert.False(mconn.IsRunning())
141 case <-time.After(500 * time.Millisecond):
142 t.Fatal("Did not receive error in 500ms")