OSDN Git Service

6b2cdb8fe88f85fe1521873a202c9c38ca3e4232
[bytom/vapor.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "io/ioutil"
5         "os"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11         cfg "github.com/vapor/config"
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/errors"
14         conn "github.com/vapor/p2p/connection"
15         "github.com/vapor/p2p/signlib"
16 )
17
18 var (
19         testCfg *cfg.Config
20 )
21
22 func init() {
23         testCfg = cfg.DefaultConfig()
24 }
25
26 /*
27 Each peer has one `MConnection` (multiplex connection) instance.
28
29 __multiplex__ *noun* a system or signal involving simultaneous transmission of
30 several messages along a single channel of communication.
31
32 Each `MConnection` handles message transmission on multiple abstract communication
33 `Channel`s.  Each channel has a globally unique byte id.
34 The byte id and the relative priorities of each `Channel` are configured upon
35 initialization of the connection.
36
37 There are two methods for sending messages:
38         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
39         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
40
41 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
42 successfully queued for the channel with the given id byte `chID`, or until the
43 request times out.  The message `msg` is serialized using Go-Amino.
44
45 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
46 channel's queue is full.
47
48 Inbound message bytes are handled with an onReceive callback function.
49 */
50 type PeerMessage struct {
51         PeerID  string
52         Bytes   []byte
53         Counter int
54 }
55
56 type TestReactor struct {
57         BaseReactor
58
59         mtx          sync.Mutex
60         channels     []*conn.ChannelDescriptor
61         logMessages  bool
62         msgsCounter  int
63         msgsReceived map[byte][]PeerMessage
64 }
65
66 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
67         tr := &TestReactor{
68                 channels:     channels,
69                 logMessages:  logMessages,
70                 msgsReceived: make(map[byte][]PeerMessage),
71         }
72         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
73
74         return tr
75 }
76
77 // GetChannels implements Reactor
78 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
79         return tr.channels
80 }
81
82 // OnStart implements BaseService
83 func (tr *TestReactor) OnStart() error {
84         tr.BaseReactor.OnStart()
85         return nil
86 }
87
88 // OnStop implements BaseService
89 func (tr *TestReactor) OnStop() {
90         tr.BaseReactor.OnStop()
91 }
92
93 // AddPeer implements Reactor by sending our state to peer.
94 func (tr *TestReactor) AddPeer(peer *Peer) error {
95         return nil
96 }
97
98 // RemovePeer implements Reactor by removing peer from the pool.
99 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
100 }
101
102 // Receive implements Reactor by handling 4 types of messages (look below).
103 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
104         if tr.logMessages {
105                 tr.mtx.Lock()
106                 defer tr.mtx.Unlock()
107                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
108                 tr.msgsCounter++
109         }
110 }
111
112 func initSwitchFunc(sw *Switch) *Switch {
113         // Make two reactors of two channels each
114         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
115                 {ID: byte(0x00), Priority: 10},
116                 {ID: byte(0x01), Priority: 10},
117         }, true))
118         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
119                 {ID: byte(0x02), Priority: 10},
120                 {ID: byte(0x03), Priority: 10},
121         }, true))
122
123         return sw
124 }
125
126 //Test connect self.
127 func TestFiltersOutItself(t *testing.T) {
128         t.Skip("skipping test")
129         dirPath, err := ioutil.TempDir(".", "")
130         if err != nil {
131                 t.Fatal(err)
132         }
133         defer os.RemoveAll(dirPath)
134
135         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
136         cfg := *testCfg
137         cfg.P2P.ListenAddress = "127.0.1.1:0"
138         swPrivKey, err := signlib.NewPrivKey()
139         if err != nil {
140                 t.Fatal(err)
141         }
142
143         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
144         s1.Start()
145         defer s1.Stop()
146
147         // simulate s1 having a public key and creating a remote peer with the same key
148         rpCfg := *testCfg
149         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
150         rp.Start()
151         defer rp.Stop()
152         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
153                 t.Fatal(err)
154         }
155
156         //S1 dialing itself ip address
157         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
158
159         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
160                 t.Fatal(err)
161         }
162 }
163
164 func TestDialBannedPeer(t *testing.T) {
165         t.Skip("skipping test")
166         dirPath, err := ioutil.TempDir(".", "")
167         if err != nil {
168                 t.Fatal(err)
169         }
170         defer os.RemoveAll(dirPath)
171
172         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
173         cfg := *testCfg
174         cfg.P2P.ListenAddress = "127.0.1.1:0"
175         swPrivKey, err := signlib.NewPrivKey()
176         if err != nil {
177                 t.Fatal(err)
178         }
179         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
180         s1.Start()
181         defer s1.Stop()
182
183         rpCfg := *testCfg
184         remotePrivKey, err := signlib.NewPrivKey()
185         if err != nil {
186                 t.Fatal(err)
187         }
188
189         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
190         rp.Start()
191         defer rp.Stop()
192         s1.AddBannedPeer(rp.addr.IP.String())
193         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
194                 t.Fatal(err)
195         }
196
197         s1.delBannedPeer(rp.addr.IP.String())
198         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
199                 t.Fatal(err)
200         }
201 }
202
203 func TestDuplicateOutBoundPeer(t *testing.T) {
204         t.Skip("skipping test")
205         dirPath, err := ioutil.TempDir(".", "")
206         if err != nil {
207                 t.Fatal(err)
208         }
209         defer os.RemoveAll(dirPath)
210
211         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
212         cfg := *testCfg
213         cfg.P2P.ListenAddress = "127.0.1.1:0"
214         swPrivKey, err := signlib.NewPrivKey()
215         if err != nil {
216                 t.Fatal(err)
217         }
218
219         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
220         s1.Start()
221         defer s1.Stop()
222
223         rpCfg := *testCfg
224         remotePrivKey, err := signlib.NewPrivKey()
225         if err != nil {
226                 t.Fatal(err)
227         }
228
229         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
230         rp.Start()
231         defer rp.Stop()
232
233         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
234                 t.Fatal(err)
235         }
236
237         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
238                 t.Fatal(err)
239         }
240 }
241
242 func TestDuplicateInBoundPeer(t *testing.T) {
243         t.Skip("skipping test")
244         dirPath, err := ioutil.TempDir(".", "")
245         if err != nil {
246                 t.Fatal(err)
247         }
248         defer os.RemoveAll(dirPath)
249
250         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
251         cfg := *testCfg
252         cfg.P2P.ListenAddress = "127.0.1.1:0"
253         swPrivKey, err := signlib.NewPrivKey()
254         if err != nil {
255                 t.Fatal(err)
256         }
257         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
258         s1.Start()
259         defer s1.Stop()
260
261         inpCfg := *testCfg
262         inpPrivKey, err := signlib.NewPrivKey()
263         if err != nil {
264                 t.Fatal(err)
265         }
266
267         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
268         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
269         if err != nil {
270                 t.Fatal(err)
271         }
272         go inp.dial(addr)
273
274         inp1Cfg := *testCfg
275         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
276         go inp1.dial(addr)
277
278         time.Sleep(1 * time.Second)
279         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
280                 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
281         }
282 }
283
284 func TestAddInboundPeer(t *testing.T) {
285         t.Skip("skipping test")
286         dirPath, err := ioutil.TempDir(".", "")
287         if err != nil {
288                 t.Fatal(err)
289         }
290         defer os.RemoveAll(dirPath)
291
292         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
293         cfg := *testCfg
294         cfg.P2P.MaxNumPeers = 2
295         cfg.P2P.ListenAddress = "127.0.1.1:0"
296         swPrivKey, err := signlib.NewPrivKey()
297         if err != nil {
298                 t.Fatal(err)
299         }
300         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
301         s1.Start()
302         defer s1.Stop()
303
304         inpCfg := *testCfg
305         inpPrivKey, err := signlib.NewPrivKey()
306         if err != nil {
307                 t.Fatal(err)
308         }
309
310         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
311         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
312         if err != nil {
313                 t.Fatal(err)
314         }
315         go inp.dial(addr)
316
317         rpCfg := *testCfg
318         rpPrivKey, err := signlib.NewPrivKey()
319         if err != nil {
320                 t.Fatal(err)
321         }
322         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
323         rp.Start()
324         defer rp.Stop()
325
326         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
327                 t.Fatal(err)
328         }
329
330         inp2Cfg := *testCfg
331
332         inp2PrivKey, err := signlib.NewPrivKey()
333         if err != nil {
334                 t.Fatal(err)
335         }
336         inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
337
338         go inp2.dial(addr)
339
340         time.Sleep(1 * time.Second)
341         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
342                 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
343         }
344 }
345
346 func TestStopPeer(t *testing.T) {
347         t.Skip("skipping test")
348         dirPath, err := ioutil.TempDir(".", "")
349         if err != nil {
350                 t.Fatal(err)
351         }
352         defer os.RemoveAll(dirPath)
353
354         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
355         cfg := *testCfg
356         cfg.P2P.MaxNumPeers = 2
357         cfg.P2P.ListenAddress = "127.0.1.1:0"
358         swPrivKey, err := signlib.NewPrivKey()
359         if err != nil {
360                 t.Fatal(err)
361         }
362         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
363         s1.Start()
364         defer s1.Stop()
365
366         inpCfg := *testCfg
367         inp2PrivKey, err := signlib.NewPrivKey()
368         if err != nil {
369                 t.Fatal(err)
370         }
371         inp := &inboundPeer{PrivKey: inp2PrivKey, config: &inpCfg}
372         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
373         if err != nil {
374                 t.Fatal(err)
375         }
376         go inp.dial(addr)
377
378         rpCfg := *testCfg
379         rpPrivKey, err := signlib.NewPrivKey()
380         if err != nil {
381                 t.Fatal(err)
382         }
383
384         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
385         rp.Start()
386         defer rp.Stop()
387
388         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
389                 t.Fatal(err)
390         }
391         time.Sleep(1 * time.Second)
392         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
393                 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
394         }
395
396         s1.StopPeerGracefully(s1.peers.list[0].Key)
397         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
398                 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
399         }
400
401         s1.StopPeerForError(s1.peers.list[0], "stop for test")
402         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
403                 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
404         }
405 }