13 "github.com/stretchr/testify/assert"
14 "github.com/stretchr/testify/require"
15 crypto "github.com/tendermint/go-crypto"
16 wire "github.com/tendermint/go-wire"
18 cfg "github.com/bytom/config"
19 "github.com/tendermint/tmlibs/log"
27 config = cfg.DefaultP2PConfig()
28 config.PexReactor = true
31 type PeerMessage struct {
37 type TestReactor struct {
41 channels []*ChannelDescriptor
46 msgsReceived map[byte][]PeerMessage
49 func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
52 logMessages: logMessages,
53 msgsReceived: make(map[byte][]PeerMessage),
55 tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
56 tr.SetLogger(log.TestingLogger())
60 func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
64 func (tr *TestReactor) AddPeer(peer *Peer) {
67 tr.peersAdded = append(tr.peersAdded, peer)
70 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
73 tr.peersRemoved = append(tr.peersRemoved, peer)
76 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
80 //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
81 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
86 func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
89 return tr.msgsReceived[chID]
92 //-----------------------------------------------------------------------------
94 // convenience method for creating two switches connected to each other.
95 // XXX: note this uses net.Pipe and not a proper TCP conn
96 func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
97 // Create two switches that will be interconnected.
98 switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
99 return switches[0], switches[1]
102 func initSwitchFunc(i int, sw *Switch) *Switch {
103 // Make two reactors of two channels each
104 sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
105 &ChannelDescriptor{ID: byte(0x00), Priority: 10},
106 &ChannelDescriptor{ID: byte(0x01), Priority: 10},
108 sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
109 &ChannelDescriptor{ID: byte(0x02), Priority: 10},
110 &ChannelDescriptor{ID: byte(0x03), Priority: 10},
115 func TestSwitches(t *testing.T) {
116 s1, s2 := makeSwitchPair(t, initSwitchFunc)
120 if s1.Peers().Size() != 1 {
121 t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
123 if s2.Peers().Size() != 1 {
124 t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
127 // Lets send some messages
128 ch0Msg := "channel zero"
129 ch1Msg := "channel foo"
130 ch2Msg := "channel bar"
132 s1.Broadcast(byte(0x00), ch0Msg)
133 s1.Broadcast(byte(0x01), ch1Msg)
134 s1.Broadcast(byte(0x02), ch2Msg)
136 // Wait for things to settle...
137 time.Sleep(5000 * time.Millisecond)
139 // Check message on ch0
140 ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
141 if len(ch0Msgs) != 1 {
142 t.Errorf("Expected to have received 1 message in ch0")
144 if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
145 t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
148 // Check message on ch1
149 ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
150 if len(ch1Msgs) != 1 {
151 t.Errorf("Expected to have received 1 message in ch1")
153 if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
154 t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
157 // Check message on ch2
158 ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
159 if len(ch2Msgs) != 1 {
160 t.Errorf("Expected to have received 1 message in ch2")
162 if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
163 t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
168 func TestConnAddrFilter(t *testing.T) {
169 s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
170 s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
174 s1.SetAddrFilter(func(addr net.Addr) error {
175 if addr.String() == c1.RemoteAddr().String() {
176 return fmt.Errorf("Error: pipe is blacklisted")
181 // connect to good peer
183 s1.addPeerWithConnection(c1)
186 s2.addPeerWithConnection(c2)
189 // Wait for things to happen, peers to get added...
190 time.Sleep(100 * time.Millisecond * time.Duration(4))
194 if s1.Peers().Size() != 0 {
195 t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
197 if s2.Peers().Size() != 0 {
198 t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
202 func TestConnPubKeyFilter(t *testing.T) {
203 s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
204 s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
209 s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
210 if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
211 return fmt.Errorf("Error: pipe is blacklisted")
216 // connect to good peer
218 s1.addPeerWithConnection(c1)
221 s2.addPeerWithConnection(c2)
224 // Wait for things to happen, peers to get added...
225 time.Sleep(100 * time.Millisecond * time.Duration(4))
229 if s1.Peers().Size() != 0 {
230 t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
232 if s2.Peers().Size() != 0 {
233 t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
237 func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
238 assert, require := assert.New(t), require.New(t)
240 sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
244 // simulate remote peer
245 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig(config)}
249 peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
251 err = sw.AddPeer(peer)
254 // simulate failure by closing connection
257 time.Sleep(100 * time.Millisecond)
259 assert.Zero(sw.Peers().Size())
260 assert.False(peer.IsRunning())
263 func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
264 assert, require := assert.New(t), require.New(t)
266 sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
270 // simulate remote peer
271 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig(config)}
275 peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
276 peer.makePersistent()
278 err = sw.AddPeer(peer)
281 // simulate failure by closing connection
284 // TODO: actually detect the disconnection and wait for reconnect
285 time.Sleep(100 * time.Millisecond)
287 assert.NotZero(sw.Peers().Size())
288 assert.False(peer.IsRunning())
291 func BenchmarkSwitches(b *testing.B) {
294 s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
295 // Make bar reactors of bar channels each
296 sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
297 &ChannelDescriptor{ID: byte(0x00), Priority: 10},
298 &ChannelDescriptor{ID: byte(0x01), Priority: 10},
300 sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
301 &ChannelDescriptor{ID: byte(0x02), Priority: 10},
302 &ChannelDescriptor{ID: byte(0x03), Priority: 10},
309 // Allow time for goroutines to boot up
310 time.Sleep(1000 * time.Millisecond)
313 numSuccess, numFailure := 0, 0
315 // Send random message from foo channel to another
316 for i := 0; i < b.N; i++ {
318 successChan := s1.Broadcast(chID, "test data")
319 for s := range successChan {
328 b.Logf("success: %v, failure: %v", numSuccess, numFailure)
330 // Allow everything to flush before stopping switches & closing connections.
332 time.Sleep(1000 * time.Millisecond)