10 "github.com/davecgh/go-spew/spew"
11 "github.com/tendermint/go-crypto"
13 cfg "github.com/vapor/config"
14 dbm "github.com/vapor/database/leveldb"
15 "github.com/vapor/errors"
16 conn "github.com/vapor/p2p/connection"
24 testCfg = cfg.DefaultConfig()
28 Each peer has one `MConnection` (multiplex connection) instance.
30 __multiplex__ *noun* a system or signal involving simultaneous transmission of
31 several messages along a single channel of communication.
33 Each `MConnection` handles message transmission on multiple abstract communication
34 `Channel`s. Each channel has a globally unique byte id.
35 The byte id and the relative priorities of each `Channel` are configured upon
36 initialization of the connection.
38 There are two methods for sending messages:
39 func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
40 func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
42 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
43 successfully queued for the channel with the given id byte `chID`, or until the
44 request times out. The message `msg` is serialized using Go-Amino.
46 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
47 channel's queue is full.
49 Inbound message bytes are handled with an onReceive callback function.
51 type PeerMessage struct {
57 type TestReactor struct {
61 channels []*conn.ChannelDescriptor
64 msgsReceived map[byte][]PeerMessage
67 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
70 logMessages: logMessages,
71 msgsReceived: make(map[byte][]PeerMessage),
73 tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
78 // GetChannels implements Reactor
79 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
83 // OnStart implements BaseService
84 func (tr *TestReactor) OnStart() error {
85 tr.BaseReactor.OnStart()
89 // OnStop implements BaseService
90 func (tr *TestReactor) OnStop() {
91 tr.BaseReactor.OnStop()
94 // AddPeer implements Reactor by sending our state to peer.
95 func (tr *TestReactor) AddPeer(peer *Peer) error {
99 // RemovePeer implements Reactor by removing peer from the pool.
100 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
103 // Receive implements Reactor by handling 4 types of messages (look below).
104 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
107 defer tr.mtx.Unlock()
108 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
113 func initSwitchFunc(sw *Switch) *Switch {
114 // Make two reactors of two channels each
115 sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
116 {ID: byte(0x00), Priority: 10},
117 {ID: byte(0x01), Priority: 10},
119 sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
120 {ID: byte(0x02), Priority: 10},
121 {ID: byte(0x03), Priority: 10},
128 func TestFiltersOutItself(t *testing.T) {
129 t.Skip("skipping test")
130 dirPath, err := ioutil.TempDir(".", "")
134 defer os.RemoveAll(dirPath)
136 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
138 cfg.P2P.ListenAddress = "127.0.1.1:0"
139 swPrivKey := crypto.GenPrivKeyEd25519()
140 //cfg.P2P.PrivateKey = swPrivKey.String()
141 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
145 // simulate s1 having a public key and creating a remote peer with the same key
147 rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
150 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
154 //S1 dialing itself ip address
155 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
157 if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
162 func TestDialBannedPeer(t *testing.T) {
163 t.Skip("skipping test")
164 dirPath, err := ioutil.TempDir(".", "")
168 defer os.RemoveAll(dirPath)
170 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
172 cfg.P2P.ListenAddress = "127.0.1.1:0"
173 swPrivKey := crypto.GenPrivKeyEd25519()
174 //cfg.P2P.PrivateKey = swPrivKey.String()
175 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
180 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: &rpCfg}
183 s1.AddBannedPeer(rp.addr.IP.String())
184 if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
188 s1.delBannedPeer(rp.addr.IP.String())
189 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
194 func TestDuplicateOutBoundPeer(t *testing.T) {
195 t.Skip("skipping test")
196 dirPath, err := ioutil.TempDir(".", "")
200 defer os.RemoveAll(dirPath)
202 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
204 cfg.P2P.ListenAddress = "127.0.1.1:0"
205 swPrivKey := crypto.GenPrivKeyEd25519()
206 //cfg.P2P.PrivateKey = swPrivKey.String()
207 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
212 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: &rpCfg}
216 if err = s1.DialPeerWithAddress(rp.addr); err != nil {
220 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
225 func TestDuplicateInBoundPeer(t *testing.T) {
226 t.Skip("skipping test")
227 dirPath, err := ioutil.TempDir(".", "")
231 defer os.RemoveAll(dirPath)
233 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
235 cfg.P2P.ListenAddress = "127.0.1.1:0"
236 swPrivKey := crypto.GenPrivKeyEd25519()
237 //cfg.P2P.PrivateKey = swPrivKey.String()
238 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
243 inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: &inpCfg}
244 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
251 inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
254 time.Sleep(1 * time.Second)
255 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
256 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
260 func TestAddInboundPeer(t *testing.T) {
261 t.Skip("skipping test")
262 dirPath, err := ioutil.TempDir(".", "")
266 defer os.RemoveAll(dirPath)
268 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
270 cfg.P2P.MaxNumPeers = 2
271 cfg.P2P.ListenAddress = "127.0.1.1:0"
272 swPrivKey := crypto.GenPrivKeyEd25519()
273 //cfg.P2P.PrivateKey = swPrivKey.String()
274 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
279 inpPrivKey := crypto.GenPrivKeyEd25519()
280 //inpCfg.P2P.PrivateKey = inpPrivKey.String()
281 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
282 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
289 rpPrivKey := crypto.GenPrivKeyEd25519()
290 //rpCfg.P2P.PrivateKey = rpPrivKey.String()
291 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
295 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
300 inp2PrivKey := crypto.GenPrivKeyEd25519()
301 //inp2Cfg.P2P.PrivateKey = inp2PrivKey.String()
302 inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
306 time.Sleep(1 * time.Second)
307 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
308 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
312 func TestStopPeer(t *testing.T) {
313 t.Skip("skipping test")
314 dirPath, err := ioutil.TempDir(".", "")
318 defer os.RemoveAll(dirPath)
320 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
322 cfg.P2P.MaxNumPeers = 2
323 cfg.P2P.ListenAddress = "127.0.1.1:0"
324 swPrivKey := crypto.GenPrivKeyEd25519()
325 //cfg.P2P.PrivateKey = swPrivKey.String()
326 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
331 inpPrivKey := crypto.GenPrivKeyEd25519()
332 //inpCfg.P2P.PrivateKey = inpPrivKey.String()
333 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
334 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
341 rpPrivKey := crypto.GenPrivKeyEd25519()
342 //rpCfg.P2P.PrivateKey = rpPrivKey.String()
343 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
347 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
350 time.Sleep(1 * time.Second)
351 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
352 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
355 s1.StopPeerGracefully(s1.peers.list[0].Key)
356 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
357 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
360 s1.StopPeerForError(s1.peers.list[0], "stop for test")
361 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
362 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))