OSDN Git Service

writer close
[bytom/vapor.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "io/ioutil"
5         "os"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11         cfg "github.com/vapor/config"
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/errors"
14         conn "github.com/vapor/p2p/connection"
15         "github.com/vapor/p2p/security"
16         "github.com/vapor/p2p/signlib"
17 )
18
19 var (
20         testCfg *cfg.Config
21 )
22
23 func init() {
24         testCfg = cfg.DefaultConfig()
25 }
26
27 /*
28 Each peer has one `MConnection` (multiplex connection) instance.
29
30 __multiplex__ *noun* a system or signal involving simultaneous transmission of
31 several messages along a single channel of communication.
32
33 Each `MConnection` handles message transmission on multiple abstract communication
34 `Channel`s.  Each channel has a globally unique byte id.
35 The byte id and the relative priorities of each `Channel` are configured upon
36 initialization of the connection.
37
38 There are two methods for sending messages:
39         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
40         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
41
42 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
43 successfully queued for the channel with the given id byte `chID`, or until the
44 request times out.  The message `msg` is serialized using Go-Amino.
45
46 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
47 channel's queue is full.
48
49 Inbound message bytes are handled with an onReceive callback function.
50 */
51 type PeerMessage struct {
52         PeerID  string
53         Bytes   []byte
54         Counter int
55 }
56
57 type TestReactor struct {
58         BaseReactor
59
60         mtx          sync.Mutex
61         channels     []*conn.ChannelDescriptor
62         logMessages  bool
63         msgsCounter  int
64         msgsReceived map[byte][]PeerMessage
65 }
66
67 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
68         tr := &TestReactor{
69                 channels:     channels,
70                 logMessages:  logMessages,
71                 msgsReceived: make(map[byte][]PeerMessage),
72         }
73         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
74
75         return tr
76 }
77
78 // GetChannels implements Reactor
79 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
80         return tr.channels
81 }
82
83 // OnStart implements BaseService
84 func (tr *TestReactor) OnStart() error {
85         tr.BaseReactor.OnStart()
86         return nil
87 }
88
89 // OnStop implements BaseService
90 func (tr *TestReactor) OnStop() {
91         tr.BaseReactor.OnStop()
92 }
93
94 // AddPeer implements Reactor by sending our state to peer.
95 func (tr *TestReactor) AddPeer(peer *Peer) error {
96         return nil
97 }
98
99 // RemovePeer implements Reactor by removing peer from the pool.
100 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
101 }
102
103 // Receive implements Reactor by handling 4 types of messages (look below).
104 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
105         if tr.logMessages {
106                 tr.mtx.Lock()
107                 defer tr.mtx.Unlock()
108                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
109                 tr.msgsCounter++
110         }
111 }
112
113 func initSwitchFunc(sw *Switch) *Switch {
114         // Make two reactors of two channels each
115         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
116                 {ID: byte(0x00), Priority: 10},
117                 {ID: byte(0x01), Priority: 10},
118         }, true))
119         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
120                 {ID: byte(0x02), Priority: 10},
121                 {ID: byte(0x03), Priority: 10},
122         }, true))
123
124         return sw
125 }
126
127 //Test connect self.
128 func TestFiltersOutItself(t *testing.T) {
129         t.Skip("due to fail on mac")
130         dirPath, err := ioutil.TempDir(".", "")
131         if err != nil {
132                 t.Fatal(err)
133         }
134         defer os.RemoveAll(dirPath)
135
136         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
137         cfg := *testCfg
138         cfg.DBPath = dirPath
139         cfg.P2P.ListenAddress = "127.0.1.1:0"
140         swPrivKey, err := signlib.NewPrivKey()
141         if err != nil {
142                 t.Fatal(err)
143         }
144
145         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
146         s1.Start()
147         defer s1.Stop()
148
149         rmdirPath, err := ioutil.TempDir(".", "")
150         if err != nil {
151                 t.Fatal(err)
152         }
153         defer os.RemoveAll(rmdirPath)
154
155         // simulate s1 having a public key and creating a remote peer with the same key
156         rpCfg := *testCfg
157         rpCfg.DBPath = rmdirPath
158         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
159         rp.Start()
160         defer rp.Stop()
161         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
162                 t.Fatal(err)
163         }
164
165         //S1 dialing itself ip address
166         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
167
168         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
169                 t.Fatal(err)
170         }
171 }
172
173 func TestDialBannedPeer(t *testing.T) {
174         t.Skip("due to fail on mac")
175         dirPath, err := ioutil.TempDir(".", "")
176         if err != nil {
177                 t.Fatal(err)
178         }
179         defer os.RemoveAll(dirPath)
180
181         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
182         cfg := *testCfg
183         cfg.DBPath = dirPath
184         cfg.P2P.ListenAddress = "127.0.1.1:0"
185         swPrivKey, err := signlib.NewPrivKey()
186         if err != nil {
187                 t.Fatal(err)
188         }
189         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
190         s1.Start()
191         defer s1.Stop()
192
193         rmdirPath, err := ioutil.TempDir(".", "")
194         if err != nil {
195                 t.Fatal(err)
196         }
197         defer os.RemoveAll(rmdirPath)
198
199         rpCfg := *testCfg
200         rpCfg.DBPath = rmdirPath
201         remotePrivKey, err := signlib.NewPrivKey()
202         if err != nil {
203                 t.Fatal(err)
204         }
205
206         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
207         rp.Start()
208         defer rp.Stop()
209         for {
210                 if ok := s1.security.IsBanned(rp.addr.IP.String(), security.LevelMsgIllegal, "test"); ok {
211                         break
212                 }
213         }
214         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != security.ErrConnectBannedPeer {
215                 t.Fatal(err)
216         }
217 }
218
219 func TestDuplicateOutBoundPeer(t *testing.T) {
220         t.Skip("due to fail on mac")
221         dirPath, err := ioutil.TempDir(".", "")
222         if err != nil {
223                 t.Fatal(err)
224         }
225         defer os.RemoveAll(dirPath)
226
227         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
228         cfg := *testCfg
229         cfg.DBPath = dirPath
230         cfg.P2P.ListenAddress = "127.0.1.1:0"
231         swPrivKey, err := signlib.NewPrivKey()
232         if err != nil {
233                 t.Fatal(err)
234         }
235
236         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
237         s1.Start()
238         defer s1.Stop()
239
240         rmdirPath, err := ioutil.TempDir(".", "")
241         if err != nil {
242                 t.Fatal(err)
243         }
244         defer os.RemoveAll(rmdirPath)
245
246         rpCfg := *testCfg
247         rpCfg.DBPath = rmdirPath
248         remotePrivKey, err := signlib.NewPrivKey()
249         if err != nil {
250                 t.Fatal(err)
251         }
252
253         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
254         rp.Start()
255         defer rp.Stop()
256
257         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
258                 t.Fatal(err)
259         }
260
261         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
262                 t.Fatal(err)
263         }
264 }
265
266 func TestDuplicateInBoundPeer(t *testing.T) {
267         t.Skip("due to fail on mac")
268         dirPath, err := ioutil.TempDir(".", "")
269         if err != nil {
270                 t.Fatal(err)
271         }
272         defer os.RemoveAll(dirPath)
273
274         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
275         cfg := *testCfg
276         cfg.DBPath = dirPath
277         cfg.P2P.ListenAddress = "127.0.1.1:0"
278         swPrivKey, err := signlib.NewPrivKey()
279         if err != nil {
280                 t.Fatal(err)
281         }
282         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
283         s1.Start()
284         defer s1.Stop()
285
286         inpCfg := *testCfg
287         inpPrivKey, err := signlib.NewPrivKey()
288         if err != nil {
289                 t.Fatal(err)
290         }
291
292         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
293         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
294         if err != nil {
295                 t.Fatal(err)
296         }
297         go inp.dial(addr)
298
299         inp1Cfg := *testCfg
300         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
301         go inp1.dial(addr)
302
303         time.Sleep(1 * time.Second)
304         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
305                 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
306         }
307 }
308
309 func TestAddInboundPeer(t *testing.T) {
310         t.Skip("due to fail on mac")
311         dirPath, err := ioutil.TempDir(".", "")
312         if err != nil {
313                 t.Fatal(err)
314         }
315         defer os.RemoveAll(dirPath)
316
317         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
318         cfg := *testCfg
319         cfg.DBPath = dirPath
320         cfg.P2P.MaxNumPeers = 2
321         cfg.P2P.ListenAddress = "127.0.1.1:0"
322         swPrivKey, err := signlib.NewPrivKey()
323         if err != nil {
324                 t.Fatal(err)
325         }
326         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
327         s1.Start()
328         defer s1.Stop()
329
330         inpCfg := *testCfg
331         inpPrivKey, err := signlib.NewPrivKey()
332         if err != nil {
333                 t.Fatal(err)
334         }
335
336         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
337         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
338         if err != nil {
339                 t.Fatal(err)
340         }
341         go inp.dial(addr)
342
343         rpCfg := *testCfg
344         rpPrivKey, err := signlib.NewPrivKey()
345         if err != nil {
346                 t.Fatal(err)
347         }
348         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
349         rp.Start()
350         defer rp.Stop()
351
352         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
353                 t.Fatal(err)
354         }
355
356         inp2Cfg := *testCfg
357
358         inp2PrivKey, err := signlib.NewPrivKey()
359         if err != nil {
360                 t.Fatal(err)
361         }
362         inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
363
364         go inp2.dial(addr)
365
366         time.Sleep(1 * time.Second)
367         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
368                 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
369         }
370 }
371
372 func TestStopPeer(t *testing.T) {
373         t.Skip("due to fail on mac")
374         dirPath, err := ioutil.TempDir(".", "")
375         if err != nil {
376                 t.Fatal(err)
377         }
378         defer os.RemoveAll(dirPath)
379
380         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
381         cfg := *testCfg
382         cfg.DBPath = dirPath
383         cfg.P2P.MaxNumPeers = 2
384         cfg.P2P.ListenAddress = "127.0.1.1:0"
385         swPrivKey, err := signlib.NewPrivKey()
386         if err != nil {
387                 t.Fatal(err)
388         }
389         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
390         s1.Start()
391         defer s1.Stop()
392
393         inpCfg := *testCfg
394         inp2PrivKey, err := signlib.NewPrivKey()
395         if err != nil {
396                 t.Fatal(err)
397         }
398         inp := &inboundPeer{PrivKey: inp2PrivKey, config: &inpCfg}
399         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
400         if err != nil {
401                 t.Fatal(err)
402         }
403         go inp.dial(addr)
404
405         rpCfg := *testCfg
406         rpPrivKey, err := signlib.NewPrivKey()
407         if err != nil {
408                 t.Fatal(err)
409         }
410
411         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
412         rp.Start()
413         defer rp.Stop()
414
415         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
416                 t.Fatal(err)
417         }
418         time.Sleep(1 * time.Second)
419         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
420                 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
421         }
422
423         s1.StopPeerGracefully(s1.peers.list[0].Key)
424         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
425                 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
426         }
427
428         s1.StopPeerForError(s1.peers.list[0], "stop for test")
429         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
430                 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
431         }
432 }