OSDN Git Service

Add p2p security module (#143)
[bytom/vapor.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "io/ioutil"
5         "os"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11         cfg "github.com/vapor/config"
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/errors"
14         conn "github.com/vapor/p2p/connection"
15         "github.com/vapor/p2p/security"
16         "github.com/vapor/p2p/signlib"
17 )
18
19 var (
20         testCfg *cfg.Config
21 )
22
23 func init() {
24         testCfg = cfg.DefaultConfig()
25 }
26
27 /*
28 Each peer has one `MConnection` (multiplex connection) instance.
29
30 __multiplex__ *noun* a system or signal involving simultaneous transmission of
31 several messages along a single channel of communication.
32
33 Each `MConnection` handles message transmission on multiple abstract communication
34 `Channel`s.  Each channel has a globally unique byte id.
35 The byte id and the relative priorities of each `Channel` are configured upon
36 initialization of the connection.
37
38 There are two methods for sending messages:
39         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
40         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
41
42 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
43 successfully queued for the channel with the given id byte `chID`, or until the
44 request times out.  The message `msg` is serialized using Go-Amino.
45
46 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
47 channel's queue is full.
48
49 Inbound message bytes are handled with an onReceive callback function.
50 */
51 type PeerMessage struct {
52         PeerID  string
53         Bytes   []byte
54         Counter int
55 }
56
57 type TestReactor struct {
58         BaseReactor
59
60         mtx          sync.Mutex
61         channels     []*conn.ChannelDescriptor
62         logMessages  bool
63         msgsCounter  int
64         msgsReceived map[byte][]PeerMessage
65 }
66
67 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
68         tr := &TestReactor{
69                 channels:     channels,
70                 logMessages:  logMessages,
71                 msgsReceived: make(map[byte][]PeerMessage),
72         }
73         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
74
75         return tr
76 }
77
78 // GetChannels implements Reactor
79 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
80         return tr.channels
81 }
82
83 // OnStart implements BaseService
84 func (tr *TestReactor) OnStart() error {
85         tr.BaseReactor.OnStart()
86         return nil
87 }
88
89 // OnStop implements BaseService
90 func (tr *TestReactor) OnStop() {
91         tr.BaseReactor.OnStop()
92 }
93
94 // AddPeer implements Reactor by sending our state to peer.
95 func (tr *TestReactor) AddPeer(peer *Peer) error {
96         return nil
97 }
98
99 // RemovePeer implements Reactor by removing peer from the pool.
100 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
101 }
102
103 // Receive implements Reactor by handling 4 types of messages (look below).
104 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
105         if tr.logMessages {
106                 tr.mtx.Lock()
107                 defer tr.mtx.Unlock()
108                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
109                 tr.msgsCounter++
110         }
111 }
112
113 func initSwitchFunc(sw *Switch) *Switch {
114         // Make two reactors of two channels each
115         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
116                 {ID: byte(0x00), Priority: 10},
117                 {ID: byte(0x01), Priority: 10},
118         }, true))
119         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
120                 {ID: byte(0x02), Priority: 10},
121                 {ID: byte(0x03), Priority: 10},
122         }, true))
123
124         return sw
125 }
126
127 //Test connect self.
128 func TestFiltersOutItself(t *testing.T) {
129         dirPath, err := ioutil.TempDir(".", "")
130         if err != nil {
131                 t.Fatal(err)
132         }
133         defer os.RemoveAll(dirPath)
134
135         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
136         cfg := *testCfg
137         cfg.DBPath = dirPath
138         cfg.P2P.ListenAddress = "127.0.1.1:0"
139         swPrivKey, err := signlib.NewPrivKey()
140         if err != nil {
141                 t.Fatal(err)
142         }
143
144         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
145         s1.Start()
146         defer s1.Stop()
147
148         rmdirPath, err := ioutil.TempDir(".", "")
149         if err != nil {
150                 t.Fatal(err)
151         }
152         defer os.RemoveAll(rmdirPath)
153
154         // simulate s1 having a public key and creating a remote peer with the same key
155         rpCfg := *testCfg
156         rpCfg.DBPath = rmdirPath
157         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
158         rp.Start()
159         defer rp.Stop()
160         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
161                 t.Fatal(err)
162         }
163
164         //S1 dialing itself ip address
165         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
166
167         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
168                 t.Fatal(err)
169         }
170 }
171
172 func TestDialBannedPeer(t *testing.T) {
173         dirPath, err := ioutil.TempDir(".", "")
174         if err != nil {
175                 t.Fatal(err)
176         }
177         defer os.RemoveAll(dirPath)
178
179         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
180         cfg := *testCfg
181         cfg.DBPath = dirPath
182         cfg.P2P.ListenAddress = "127.0.1.1:0"
183         swPrivKey, err := signlib.NewPrivKey()
184         if err != nil {
185                 t.Fatal(err)
186         }
187         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
188         s1.Start()
189         defer s1.Stop()
190
191         rmdirPath, err := ioutil.TempDir(".", "")
192         if err != nil {
193                 t.Fatal(err)
194         }
195         defer os.RemoveAll(rmdirPath)
196
197         rpCfg := *testCfg
198         rpCfg.DBPath = rmdirPath
199         remotePrivKey, err := signlib.NewPrivKey()
200         if err != nil {
201                 t.Fatal(err)
202         }
203
204         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
205         rp.Start()
206         defer rp.Stop()
207         for {
208                 if ok := s1.security.IsBanned(rp.addr.IP.String(), security.LevelMsgIllegal, "test"); ok {
209                         break
210                 }
211         }
212         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != security.ErrConnectBannedPeer {
213                 t.Fatal(err)
214         }
215 }
216
217 func TestDuplicateOutBoundPeer(t *testing.T) {
218         dirPath, err := ioutil.TempDir(".", "")
219         if err != nil {
220                 t.Fatal(err)
221         }
222         defer os.RemoveAll(dirPath)
223
224         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
225         cfg := *testCfg
226         cfg.DBPath = dirPath
227         cfg.P2P.ListenAddress = "127.0.1.1:0"
228         swPrivKey, err := signlib.NewPrivKey()
229         if err != nil {
230                 t.Fatal(err)
231         }
232
233         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
234         s1.Start()
235         defer s1.Stop()
236
237         rmdirPath, err := ioutil.TempDir(".", "")
238         if err != nil {
239                 t.Fatal(err)
240         }
241         defer os.RemoveAll(rmdirPath)
242
243         rpCfg := *testCfg
244         rpCfg.DBPath = rmdirPath
245         remotePrivKey, err := signlib.NewPrivKey()
246         if err != nil {
247                 t.Fatal(err)
248         }
249
250         rp := &remotePeer{PrivKey: remotePrivKey, Config: &rpCfg}
251         rp.Start()
252         defer rp.Stop()
253
254         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
255                 t.Fatal(err)
256         }
257
258         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
259                 t.Fatal(err)
260         }
261 }
262
263 func TestDuplicateInBoundPeer(t *testing.T) {
264         dirPath, err := ioutil.TempDir(".", "")
265         if err != nil {
266                 t.Fatal(err)
267         }
268         defer os.RemoveAll(dirPath)
269
270         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
271         cfg := *testCfg
272         cfg.DBPath = dirPath
273         cfg.P2P.ListenAddress = "127.0.1.1:0"
274         swPrivKey, err := signlib.NewPrivKey()
275         if err != nil {
276                 t.Fatal(err)
277         }
278         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
279         s1.Start()
280         defer s1.Stop()
281
282         inpCfg := *testCfg
283         inpPrivKey, err := signlib.NewPrivKey()
284         if err != nil {
285                 t.Fatal(err)
286         }
287
288         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
289         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
290         if err != nil {
291                 t.Fatal(err)
292         }
293         go inp.dial(addr)
294
295         inp1Cfg := *testCfg
296         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
297         go inp1.dial(addr)
298
299         time.Sleep(1 * time.Second)
300         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
301                 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
302         }
303 }
304
305 func TestAddInboundPeer(t *testing.T) {
306         dirPath, err := ioutil.TempDir(".", "")
307         if err != nil {
308                 t.Fatal(err)
309         }
310         defer os.RemoveAll(dirPath)
311
312         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
313         cfg := *testCfg
314         cfg.DBPath = dirPath
315         cfg.P2P.MaxNumPeers = 2
316         cfg.P2P.ListenAddress = "127.0.1.1:0"
317         swPrivKey, err := signlib.NewPrivKey()
318         if err != nil {
319                 t.Fatal(err)
320         }
321         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
322         s1.Start()
323         defer s1.Stop()
324
325         inpCfg := *testCfg
326         inpPrivKey, err := signlib.NewPrivKey()
327         if err != nil {
328                 t.Fatal(err)
329         }
330
331         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
332         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
333         if err != nil {
334                 t.Fatal(err)
335         }
336         go inp.dial(addr)
337
338         rpCfg := *testCfg
339         rpPrivKey, err := signlib.NewPrivKey()
340         if err != nil {
341                 t.Fatal(err)
342         }
343         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
344         rp.Start()
345         defer rp.Stop()
346
347         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
348                 t.Fatal(err)
349         }
350
351         inp2Cfg := *testCfg
352
353         inp2PrivKey, err := signlib.NewPrivKey()
354         if err != nil {
355                 t.Fatal(err)
356         }
357         inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
358
359         go inp2.dial(addr)
360
361         time.Sleep(1 * time.Second)
362         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
363                 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
364         }
365 }
366
367 func TestStopPeer(t *testing.T) {
368         dirPath, err := ioutil.TempDir(".", "")
369         if err != nil {
370                 t.Fatal(err)
371         }
372         defer os.RemoveAll(dirPath)
373
374         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
375         cfg := *testCfg
376         cfg.DBPath = dirPath
377         cfg.P2P.MaxNumPeers = 2
378         cfg.P2P.ListenAddress = "127.0.1.1:0"
379         swPrivKey, err := signlib.NewPrivKey()
380         if err != nil {
381                 t.Fatal(err)
382         }
383         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
384         s1.Start()
385         defer s1.Stop()
386
387         inpCfg := *testCfg
388         inp2PrivKey, err := signlib.NewPrivKey()
389         if err != nil {
390                 t.Fatal(err)
391         }
392         inp := &inboundPeer{PrivKey: inp2PrivKey, config: &inpCfg}
393         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
394         if err != nil {
395                 t.Fatal(err)
396         }
397         go inp.dial(addr)
398
399         rpCfg := *testCfg
400         rpPrivKey, err := signlib.NewPrivKey()
401         if err != nil {
402                 t.Fatal(err)
403         }
404
405         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
406         rp.Start()
407         defer rp.Stop()
408
409         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
410                 t.Fatal(err)
411         }
412         time.Sleep(1 * time.Second)
413         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
414                 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
415         }
416
417         s1.StopPeerGracefully(s1.peers.list[0].Key)
418         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
419                 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
420         }
421
422         s1.StopPeerForError(s1.peers.list[0], "stop for test")
423         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
424                 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
425         }
426 }