OSDN Git Service

check the switch level logic (#1026)
[bytom/bytom.git] / p2p / switch_test.go
diff --git a/p2p/switch_test.go b/p2p/switch_test.go
deleted file mode 100644 (file)
index a8f41f5..0000000
+++ /dev/null
@@ -1,338 +0,0 @@
-// +build !network
-
-package p2p
-
-import (
-       "bytes"
-       "fmt"
-       "net"
-       "sync"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
-       crypto "github.com/tendermint/go-crypto"
-       wire "github.com/tendermint/go-wire"
-
-       cfg "github.com/bytom/config"
-       "github.com/tendermint/tmlibs/log"
-)
-
-var (
-       config *cfg.P2PConfig
-)
-
-func init() {
-       config = cfg.DefaultP2PConfig()
-       config.PexReactor = true
-}
-
-type PeerMessage struct {
-       PeerKey string
-       Bytes   []byte
-       Counter int
-}
-
-type TestReactor struct {
-       BaseReactor
-
-       mtx          sync.Mutex
-       channels     []*ChannelDescriptor
-       peersAdded   []*Peer
-       peersRemoved []*Peer
-       logMessages  bool
-       msgsCounter  int
-       msgsReceived map[byte][]PeerMessage
-}
-
-func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
-       tr := &TestReactor{
-               channels:     channels,
-               logMessages:  logMessages,
-               msgsReceived: make(map[byte][]PeerMessage),
-       }
-       tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
-       tr.SetLogger(log.TestingLogger())
-       return tr
-}
-
-func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
-       return tr.channels
-}
-
-func (tr *TestReactor) AddPeer(peer *Peer) error {
-       tr.mtx.Lock()
-       defer tr.mtx.Unlock()
-       tr.peersAdded = append(tr.peersAdded, peer)
-       return nil
-}
-
-func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
-       tr.mtx.Lock()
-       defer tr.mtx.Unlock()
-       tr.peersRemoved = append(tr.peersRemoved, peer)
-}
-
-func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
-       if tr.logMessages {
-               tr.mtx.Lock()
-               defer tr.mtx.Unlock()
-               //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
-               tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
-               tr.msgsCounter++
-       }
-}
-
-func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
-       tr.mtx.Lock()
-       defer tr.mtx.Unlock()
-       return tr.msgsReceived[chID]
-}
-
-//-----------------------------------------------------------------------------
-
-// convenience method for creating two switches connected to each other.
-// XXX: note this uses net.Pipe and not a proper TCP conn
-func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
-       // Create two switches that will be interconnected.
-       switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
-       return switches[0], switches[1]
-}
-
-func initSwitchFunc(i int, sw *Switch) *Switch {
-       // Make two reactors of two channels each
-       sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
-               {ID: byte(0x00), Priority: 10},
-               {ID: byte(0x01), Priority: 10},
-       }, true))
-       sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
-               {ID: byte(0x02), Priority: 10},
-               {ID: byte(0x03), Priority: 10},
-       }, true))
-       return sw
-}
-
-func TestSwitches(t *testing.T) {
-       s1, s2 := makeSwitchPair(t, initSwitchFunc)
-       defer s1.Stop()
-       defer s2.Stop()
-
-       if s1.Peers().Size() != 1 {
-               t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
-       }
-       if s2.Peers().Size() != 1 {
-               t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
-       }
-
-       // Lets send some messages
-       ch0Msg := "channel zero"
-       ch1Msg := "channel foo"
-       ch2Msg := "channel bar"
-
-       s1.Broadcast(byte(0x00), ch0Msg)
-       s1.Broadcast(byte(0x01), ch1Msg)
-       s1.Broadcast(byte(0x02), ch2Msg)
-
-       // Wait for things to settle...
-       time.Sleep(5000 * time.Millisecond)
-
-       // Check message on ch0
-       ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
-       if len(ch0Msgs) != 1 {
-               t.Errorf("Expected to have received 1 message in ch0")
-       }
-       if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
-               t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
-       }
-
-       // Check message on ch1
-       ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
-       if len(ch1Msgs) != 1 {
-               t.Errorf("Expected to have received 1 message in ch1")
-       }
-       if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
-               t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
-       }
-
-       // Check message on ch2
-       ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
-       if len(ch2Msgs) != 1 {
-               t.Errorf("Expected to have received 1 message in ch2")
-       }
-       if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
-               t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
-       }
-
-}
-
-func TestConnAddrFilter(t *testing.T) {
-       s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-       s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-
-       c1, c2 := net.Pipe()
-
-       s1.SetAddrFilter(func(addr net.Addr) error {
-               if addr.String() == c1.RemoteAddr().String() {
-                       return fmt.Errorf("Error: pipe is blacklisted")
-               }
-               return nil
-       })
-
-       // connect to good peer
-       go func() {
-               err := s1.addPeerWithConnection(c1)
-               assert.NotNil(t, err, "expected err")
-       }()
-       go func() {
-               err := s2.addPeerWithConnection(c2)
-               assert.NotNil(t, err, "expected err")
-       }()
-
-       // Wait for things to happen, peers to get added...
-       time.Sleep(100 * time.Millisecond * time.Duration(4))
-
-       defer s1.Stop()
-       defer s2.Stop()
-       if s1.Peers().Size() != 0 {
-               t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
-       }
-       if s2.Peers().Size() != 0 {
-               t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
-       }
-}
-
-func TestConnPubKeyFilter(t *testing.T) {
-       s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-       s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-
-       c1, c2 := net.Pipe()
-
-       // set pubkey filter
-       s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
-               if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
-                       return fmt.Errorf("Error: pipe is blacklisted")
-               }
-               return nil
-       })
-
-       // connect to good peer
-       go func() {
-               err := s1.addPeerWithConnection(c1)
-               assert.NotNil(t, err, "expected err")
-       }()
-       go func() {
-               err := s2.addPeerWithConnection(c2)
-               assert.NotNil(t, err, "expected err")
-       }()
-
-       // Wait for things to happen, peers to get added...
-       time.Sleep(100 * time.Millisecond * time.Duration(4))
-
-       defer s1.Stop()
-       defer s2.Stop()
-       if s1.Peers().Size() != 0 {
-               t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
-       }
-       if s2.Peers().Size() != 0 {
-               t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
-       }
-}
-
-func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-       sw.Start()
-       defer sw.Stop()
-
-       // simulate remote peer
-       rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig(config)}
-       rp.Start()
-       defer rp.Stop()
-
-       peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
-       require.Nil(err)
-       err = sw.AddPeer(peer)
-       require.Nil(err)
-
-       // simulate failure by closing connection
-       peer.CloseConn()
-
-       time.Sleep(100 * time.Millisecond)
-
-       assert.Zero(sw.Peers().Size())
-       assert.False(peer.IsRunning())
-}
-
-func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
-       sw.Start()
-       defer sw.Stop()
-
-       // simulate remote peer
-       rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig(config)}
-       rp.Start()
-       defer rp.Stop()
-
-       peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
-       peer.makePersistent()
-       require.Nil(err)
-       err = sw.AddPeer(peer)
-       require.Nil(err)
-
-       // simulate failure by closing connection
-       peer.CloseConn()
-
-       // TODO: actually detect the disconnection and wait for reconnect
-       time.Sleep(100 * time.Millisecond)
-
-       assert.NotZero(sw.Peers().Size())
-       assert.False(peer.IsRunning())
-}
-
-func BenchmarkSwitches(b *testing.B) {
-       b.StopTimer()
-
-       s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
-               // Make bar reactors of bar channels each
-               sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
-                       {ID: byte(0x00), Priority: 10},
-                       {ID: byte(0x01), Priority: 10},
-               }, false))
-               sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
-                       {ID: byte(0x02), Priority: 10},
-                       {ID: byte(0x03), Priority: 10},
-               }, false))
-               return sw
-       })
-       defer s1.Stop()
-       defer s2.Stop()
-
-       // Allow time for goroutines to boot up
-       time.Sleep(1000 * time.Millisecond)
-       b.StartTimer()
-
-       numSuccess, numFailure := 0, 0
-
-       // Send random message from foo channel to another
-       for i := 0; i < b.N; i++ {
-               chID := byte(i % 4)
-               successChan := s1.Broadcast(chID, "test data")
-               for s := range successChan {
-                       if s {
-                               numSuccess++
-                       } else {
-                               numFailure++
-                       }
-               }
-       }
-
-       b.Logf("success: %v, failure: %v", numSuccess, numFailure)
-
-       // Allow everything to flush before stopping switches & closing connections.
-       b.StopTimer()
-       time.Sleep(1000 * time.Millisecond)
-}