--- /dev/null
+package p2p
+
+import (
+ "io/ioutil"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+ cfg "github.com/vapor/config"
+ dbm "github.com/vapor/database/leveldb"
+ "github.com/vapor/errors"
+ conn "github.com/vapor/p2p/connection"
+ "github.com/vapor/p2p/security"
+ "github.com/vapor/p2p/signlib"
+)
+
+var (
+ testCfg *cfg.Config
+)
+
+func init() {
+ testCfg = cfg.DefaultConfig()
+}
+
+/*
+Each peer has one `MConnection` (multiplex connection) instance.
+
+__multiplex__ *noun* a system or signal involving simultaneous transmission of
+several messages along a single channel of communication.
+
+Each `MConnection` handles message transmission on multiple abstract communication
+`Channel`s. Each channel has a globally unique byte id.
+The byte id and the relative priorities of each `Channel` are configured upon
+initialization of the connection.
+
+There are two methods for sending messages:
+ func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
+ func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
+
+`Send(chID, msgBytes)` is a blocking call that waits until `msg` is
+successfully queued for the channel with the given id byte `chID`, or until the
+request times out. The message `msg` is serialized using Go-Amino.
+
+`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
+channel's queue is full.
+
+Inbound message bytes are handled with an onReceive callback function.
+*/
+type PeerMessage struct {
+ PeerID string
+ Bytes []byte
+ Counter int
+}
+
+type TestReactor struct {
+ BaseReactor
+
+ mtx sync.Mutex
+ channels []*conn.ChannelDescriptor
+ logMessages bool
+ msgsCounter int
+ msgsReceived map[byte][]PeerMessage
+}
+
+func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
+ tr := &TestReactor{
+ channels: channels,
+ logMessages: logMessages,
+ msgsReceived: make(map[byte][]PeerMessage),
+ }
+ tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
+
+ return tr
+}
+
+// GetChannels implements Reactor
+func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
+ return tr.channels
+}
+
+// OnStart implements BaseService
+func (tr *TestReactor) OnStart() error {
+ tr.BaseReactor.OnStart()
+ return nil
+}
+
+// OnStop implements BaseService
+func (tr *TestReactor) OnStop() {
+ tr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (tr *TestReactor) AddPeer(peer *Peer) error {
+ return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
+ if tr.logMessages {
+ tr.mtx.Lock()
+ defer tr.mtx.Unlock()
+ tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
+ tr.msgsCounter++
+ }
+}
+
+func initSwitchFunc(sw *Switch) *Switch {
+ // Make two reactors of two channels each
+ sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
+ {ID: byte(0x00), Priority: 10},
+ {ID: byte(0x01), Priority: 10},
+ }, true))
+ sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
+ {ID: byte(0x02), Priority: 10},
+ {ID: byte(0x03), Priority: 10},
+ }, true))
+
+ return sw
+}
+
+//Test connect self.
+func TestFiltersOutItself(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ rmdirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(rmdirPath)
+
+ // simulate s1 having a public key and creating a remote peer with the same key
+ rpCfg := *testCfg
+ rpCfg.DBPath = rmdirPath
+ rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
+ rp.Start()
+ defer rp.Stop()
+ if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
+ t.Fatal(err)
+ }
+
+ //S1 dialing itself ip address
+ addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
+
+ if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
+ t.Fatal(err)
+ }
+}
+
+func TestDialBannedPeer(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ rmdirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(rmdirPath)
+
+ rpCfg := *testCfg
+ rpCfg.DBPath = rmdirPath
+ remotePrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
+ rp.Start()
+ defer rp.Stop()
+ for {
+ if ok := s1.security.IsBanned(rp.addr.IP.String(), security.LevelMsgIllegal, "test"); ok {
+ break
+ }
+ }
+ if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != security.ErrConnectBannedPeer {
+ t.Fatal(err)
+ }
+}
+
+func TestDuplicateOutBoundPeer(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ rmdirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(rmdirPath)
+
+ rpCfg := *testCfg
+ rpCfg.DBPath = rmdirPath
+ remotePrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
+ rp.Start()
+ defer rp.Stop()
+
+ if err = s1.DialPeerWithAddress(rp.addr); err != nil {
+ t.Fatal(err)
+ }
+
+ if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
+ t.Fatal(err)
+ }
+}
+
+func TestDuplicateInBoundPeer(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ inpCfg := *testCfg
+ inpPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
+ addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
+ if err != nil {
+ t.Fatal(err)
+ }
+ go inp.dial(addr)
+
+ inp1Cfg := *testCfg
+ inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
+ go inp1.dial(addr)
+
+ time.Sleep(1 * time.Second)
+ if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
+ t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
+ }
+}
+
+func TestAddInboundPeer(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.MaxNumPeers = 2
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ inpCfg := *testCfg
+ inpPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
+ addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
+ if err != nil {
+ t.Fatal(err)
+ }
+ go inp.dial(addr)
+
+ rpCfg := *testCfg
+ rpPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
+ rp.Start()
+ defer rp.Stop()
+
+ if err := s1.DialPeerWithAddress(rp.addr); err != nil {
+ t.Fatal(err)
+ }
+
+ inp2Cfg := *testCfg
+
+ inp2PrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
+
+ go inp2.dial(addr)
+
+ time.Sleep(1 * time.Second)
+ if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
+ t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
+ }
+}
+
+func TestStopPeer(t *testing.T) {
+ t.Skip("due to fail on mac")
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+ cfg := *testCfg
+ cfg.DBPath = dirPath
+ cfg.P2P.MaxNumPeers = 2
+ cfg.P2P.ListenAddress = "127.0.1.1:0"
+ swPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
+ s1.Start()
+ defer s1.Stop()
+
+ inpCfg := *testCfg
+ inp2PrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ inp := &inboundPeer{PrivKey: inp2PrivKey, config: &inpCfg}
+ addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
+ if err != nil {
+ t.Fatal(err)
+ }
+ go inp.dial(addr)
+
+ rpCfg := *testCfg
+ rpPrivKey, err := signlib.NewPrivKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
+ rp.Start()
+ defer rp.Stop()
+
+ if err := s1.DialPeerWithAddress(rp.addr); err != nil {
+ t.Fatal(err)
+ }
+ time.Sleep(1 * time.Second)
+ if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
+ t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
+ }
+
+ s1.StopPeerGracefully(s1.peers.list[0].Key)
+ if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
+ t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
+ }
+
+ s1.StopPeerForError(s1.peers.list[0], "stop for test")
+ if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
+ t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
+ }
+}