10 "github.com/davecgh/go-spew/spew"
11 cfg "github.com/vapor/config"
12 dbm "github.com/vapor/database/leveldb"
13 "github.com/vapor/errors"
14 conn "github.com/vapor/p2p/connection"
15 "github.com/vapor/p2p/signlib"
23 testCfg = cfg.DefaultConfig()
27 Each peer has one `MConnection` (multiplex connection) instance.
29 __multiplex__ *noun* a system or signal involving simultaneous transmission of
30 several messages along a single channel of communication.
32 Each `MConnection` handles message transmission on multiple abstract communication
33 `Channel`s. Each channel has a globally unique byte id.
34 The byte id and the relative priorities of each `Channel` are configured upon
35 initialization of the connection.
37 There are two methods for sending messages:
38 func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
39 func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
41 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
42 successfully queued for the channel with the given id byte `chID`, or until the
43 request times out. The message `msg` is serialized using Go-Amino.
45 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
46 channel's queue is full.
48 Inbound message bytes are handled with an onReceive callback function.
50 type PeerMessage struct {
56 type TestReactor struct {
60 channels []*conn.ChannelDescriptor
63 msgsReceived map[byte][]PeerMessage
66 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
69 logMessages: logMessages,
70 msgsReceived: make(map[byte][]PeerMessage),
72 tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
77 // GetChannels implements Reactor
78 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
82 // OnStart implements BaseService
83 func (tr *TestReactor) OnStart() error {
84 tr.BaseReactor.OnStart()
88 // OnStop implements BaseService
89 func (tr *TestReactor) OnStop() {
90 tr.BaseReactor.OnStop()
93 // AddPeer implements Reactor by sending our state to peer.
94 func (tr *TestReactor) AddPeer(peer *Peer) error {
98 // RemovePeer implements Reactor by removing peer from the pool.
99 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
102 // Receive implements Reactor by handling 4 types of messages (look below).
103 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
106 defer tr.mtx.Unlock()
107 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
112 func initSwitchFunc(sw *Switch) *Switch {
113 // Make two reactors of two channels each
114 sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
115 {ID: byte(0x00), Priority: 10},
116 {ID: byte(0x01), Priority: 10},
118 sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
119 {ID: byte(0x02), Priority: 10},
120 {ID: byte(0x03), Priority: 10},
127 func TestFiltersOutItself(t *testing.T) {
128 t.Skip("skipping test")
129 dirPath, err := ioutil.TempDir(".", "")
133 defer os.RemoveAll(dirPath)
135 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
137 cfg.P2P.ListenAddress = "127.0.1.1:0"
138 swPrivKey, err := signlib.NewPrivKey()
143 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
147 // simulate s1 having a public key and creating a remote peer with the same key
149 rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
152 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
156 //S1 dialing itself ip address
157 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
159 if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
164 func TestDialBannedPeer(t *testing.T) {
165 t.Skip("skipping test")
166 dirPath, err := ioutil.TempDir(".", "")
170 defer os.RemoveAll(dirPath)
172 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
174 cfg.P2P.ListenAddress = "127.0.1.1:0"
175 swPrivKey, err := signlib.NewPrivKey()
179 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
184 remotePrivKey, err := signlib.NewPrivKey()
189 rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
192 s1.AddBannedPeer(rp.addr.IP.String())
193 if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
197 s1.delBannedPeer(rp.addr.IP.String())
198 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
203 func TestDuplicateOutBoundPeer(t *testing.T) {
204 t.Skip("skipping test")
205 dirPath, err := ioutil.TempDir(".", "")
209 defer os.RemoveAll(dirPath)
211 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
213 cfg.P2P.ListenAddress = "127.0.1.1:0"
214 swPrivKey, err := signlib.NewPrivKey()
219 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
224 remotePrivKey, err := signlib.NewPrivKey()
229 rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
233 if err = s1.DialPeerWithAddress(rp.addr); err != nil {
237 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
242 func TestDuplicateInBoundPeer(t *testing.T) {
243 t.Skip("skipping test")
244 dirPath, err := ioutil.TempDir(".", "")
248 defer os.RemoveAll(dirPath)
250 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
252 cfg.P2P.ListenAddress = "127.0.1.1:0"
253 swPrivKey, err := signlib.NewPrivKey()
257 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
262 inpPrivKey, err := signlib.NewPrivKey()
267 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
268 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
275 inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
278 time.Sleep(1 * time.Second)
279 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
280 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
284 func TestAddInboundPeer(t *testing.T) {
285 t.Skip("skipping test")
286 dirPath, err := ioutil.TempDir(".", "")
290 defer os.RemoveAll(dirPath)
292 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
294 cfg.P2P.MaxNumPeers = 2
295 cfg.P2P.ListenAddress = "127.0.1.1:0"
296 swPrivKey, err := signlib.NewPrivKey()
300 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
305 inpPrivKey, err := signlib.NewPrivKey()
310 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
311 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
318 rpPrivKey, err := signlib.NewPrivKey()
322 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
326 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
332 inp2PrivKey, err := signlib.NewPrivKey()
336 inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
340 time.Sleep(1 * time.Second)
341 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
342 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
346 func TestStopPeer(t *testing.T) {
347 t.Skip("skipping test")
348 dirPath, err := ioutil.TempDir(".", "")
352 defer os.RemoveAll(dirPath)
354 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
356 cfg.P2P.MaxNumPeers = 2
357 cfg.P2P.ListenAddress = "127.0.1.1:0"
358 swPrivKey, err := signlib.NewPrivKey()
362 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
367 inp2PrivKey, err := signlib.NewPrivKey()
371 inp := &inboundPeer{PrivKey: inp2PrivKey, config: &inpCfg}
372 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
379 rpPrivKey, err := signlib.NewPrivKey()
384 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
388 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
391 time.Sleep(1 * time.Second)
392 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
393 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
396 s1.StopPeerGracefully(s1.peers.list[0].Key)
397 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
398 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
401 s1.StopPeerForError(s1.peers.list[0], "stop for test")
402 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
403 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))