OSDN Git Service

Bump golang.org/x/crypto in /lib/golang.org/x/net
[bytom/bytom.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "io/ioutil"
5         "os"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11
12         cfg "github.com/bytom/bytom/config"
13         "github.com/bytom/bytom/crypto/ed25519/chainkd"
14         dbm "github.com/bytom/bytom/database/leveldb"
15         "github.com/bytom/bytom/errors"
16         conn "github.com/bytom/bytom/p2p/connection"
17         "github.com/bytom/bytom/p2p/security"
18 )
19
20 var (
21         testCfg *cfg.Config
22 )
23
24 func init() {
25         testCfg = cfg.DefaultConfig()
26 }
27
28 /*
29 Each peer has one `MConnection` (multiplex connection) instance.
30
31 __multiplex__ *noun* a system or signal involving simultaneous transmission of
32 several messages along a single channel of communication.
33
34 Each `MConnection` handles message transmission on multiple abstract communication
35 `Channel`s.  Each channel has a globally unique byte id.
36 The byte id and the relative priorities of each `Channel` are configured upon
37 initialization of the connection.
38
39 There are two methods for sending messages:
40         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
41         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
42
43 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
44 successfully queued for the channel with the given id byte `chID`, or until the
45 request times out.  The message `msg` is serialized using Go-Amino.
46
47 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
48 channel's queue is full.
49
50 Inbound message bytes are handled with an onReceive callback function.
51 */
52 type PeerMessage struct {
53         PeerID  string
54         Bytes   []byte
55         Counter int
56 }
57
58 type TestReactor struct {
59         BaseReactor
60
61         mtx          sync.Mutex
62         channels     []*conn.ChannelDescriptor
63         logMessages  bool
64         msgsCounter  int
65         msgsReceived map[byte][]PeerMessage
66 }
67
68 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
69         tr := &TestReactor{
70                 channels:     channels,
71                 logMessages:  logMessages,
72                 msgsReceived: make(map[byte][]PeerMessage),
73         }
74         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
75
76         return tr
77 }
78
79 // GetChannels implements Reactor
80 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
81         return tr.channels
82 }
83
84 // OnStart implements BaseService
85 func (tr *TestReactor) OnStart() error {
86         tr.BaseReactor.OnStart()
87         return nil
88 }
89
90 // OnStop implements BaseService
91 func (tr *TestReactor) OnStop() {
92         tr.BaseReactor.OnStop()
93 }
94
95 // AddPeer implements Reactor by sending our state to peer.
96 func (tr *TestReactor) AddPeer(peer *Peer) error {
97         return nil
98 }
99
100 // RemovePeer implements Reactor by removing peer from the pool.
101 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
102 }
103
104 // Receive implements Reactor by handling 4 types of messages (look below).
105 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
106         if tr.logMessages {
107                 tr.mtx.Lock()
108                 defer tr.mtx.Unlock()
109                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
110                 tr.msgsCounter++
111         }
112 }
113
114 func initSwitchFunc(sw *Switch) *Switch {
115         // Make two reactors of two channels each
116         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
117                 {ID: byte(0x00), Priority: 10},
118                 {ID: byte(0x01), Priority: 10},
119         }, true))
120         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
121                 {ID: byte(0x02), Priority: 10},
122                 {ID: byte(0x03), Priority: 10},
123         }, true))
124
125         return sw
126 }
127
128 //Test connect self.
129 func TestFiltersOutItself(t *testing.T) {
130         t.Skip("due to fail on mac")
131         dirPath, err := ioutil.TempDir(".", "")
132         if err != nil {
133                 t.Fatal(err)
134         }
135         defer os.RemoveAll(dirPath)
136
137         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
138         cfg := *testCfg
139         cfg.DBPath = dirPath
140         cfg.P2P.ListenAddress = "127.0.1.1:0"
141         swPrivKey, _ := chainkd.NewXPrv(nil)
142         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
143         s1.Start()
144         defer s1.Stop()
145
146         rmdirPath, err := ioutil.TempDir(".", "")
147         if err != nil {
148                 t.Fatal(err)
149         }
150         defer os.RemoveAll(rmdirPath)
151
152         // simulate s1 having a public key and creating a remote peer with the same key
153         rpCfg := *testCfg
154         rpCfg.DBPath = rmdirPath
155         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
156         rp.Start()
157         defer rp.Stop()
158         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
159                 t.Fatal(err)
160         }
161
162         //S1 dialing itself ip address
163         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
164
165         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
166                 t.Fatal(err)
167         }
168 }
169
170 func TestDialBannedPeer(t *testing.T) {
171         t.Skip("due to fail on mac")
172         dirPath, err := ioutil.TempDir(".", "")
173         if err != nil {
174                 t.Fatal(err)
175         }
176         defer os.RemoveAll(dirPath)
177
178         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
179         cfg := *testCfg
180         cfg.DBPath = dirPath
181         cfg.P2P.ListenAddress = "127.0.1.1:0"
182         swPrivKey, _ := chainkd.NewXPrv(nil)
183         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
184         s1.Start()
185         defer s1.Stop()
186
187         rmdirPath, err := ioutil.TempDir(".", "")
188         if err != nil {
189                 t.Fatal(err)
190         }
191         defer os.RemoveAll(rmdirPath)
192
193         rpCfg := *testCfg
194         rpCfg.DBPath = rmdirPath
195         prvKey, _ := chainkd.NewXPrv(nil)
196         rp := &remotePeer{PrivKey: prvKey, Config: &rpCfg}
197         rp.Start()
198         defer rp.Stop()
199         for {
200                 if ok := s1.security.IsBanned(rp.addr.IP.String(), security.LevelMsgIllegal, "test"); ok {
201                         break
202                 }
203         }
204         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != security.ErrConnectBannedPeer {
205                 t.Fatal(err)
206         }
207 }
208
209 func TestDuplicateOutBoundPeer(t *testing.T) {
210         t.Skip("due to fail on mac")
211         dirPath, err := ioutil.TempDir(".", "")
212         if err != nil {
213                 t.Fatal(err)
214         }
215         defer os.RemoveAll(dirPath)
216
217         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
218         cfg := *testCfg
219         cfg.DBPath = dirPath
220         cfg.P2P.ListenAddress = "127.0.1.1:0"
221         swPrivKey, _ := chainkd.NewXPrv(nil)
222         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
223         s1.Start()
224         defer s1.Stop()
225
226         rmdirPath, err := ioutil.TempDir(".", "")
227         if err != nil {
228                 t.Fatal(err)
229         }
230         defer os.RemoveAll(rmdirPath)
231
232         rpCfg := *testCfg
233         prvKey, _ := chainkd.NewXPrv(nil)
234         rp := &remotePeer{PrivKey: prvKey, Config: &rpCfg}
235         rp.Start()
236         defer rp.Stop()
237
238         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
239                 t.Fatal(err)
240         }
241
242         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
243                 t.Fatal(err)
244         }
245 }
246
247 func TestDuplicateInBoundPeer(t *testing.T) {
248         t.Skip("due to fail on mac")
249         dirPath, err := ioutil.TempDir(".", "")
250         if err != nil {
251                 t.Fatal(err)
252         }
253         defer os.RemoveAll(dirPath)
254
255         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
256         cfg := *testCfg
257         cfg.DBPath = dirPath
258         cfg.P2P.ListenAddress = "127.0.1.1:0"
259         swPrivKey, _ := chainkd.NewXPrv(nil)
260         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
261         s1.Start()
262         defer s1.Stop()
263
264         inpCfg := *testCfg
265         inp := &inboundPeer{PrivKey: swPrivKey, config: &inpCfg}
266         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
267         if err != nil {
268                 t.Fatal(err)
269         }
270         go inp.dial(addr)
271
272         inp1Cfg := *testCfg
273         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
274         go inp1.dial(addr)
275
276         time.Sleep(1 * time.Second)
277         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
278                 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
279         }
280 }
281
282 func TestAddInboundPeer(t *testing.T) {
283         t.Skip("due to fail on mac")
284         dirPath, err := ioutil.TempDir(".", "")
285         if err != nil {
286                 t.Fatal(err)
287         }
288         defer os.RemoveAll(dirPath)
289
290         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
291         cfg := *testCfg
292         cfg.DBPath = dirPath
293         cfg.P2P.MaxNumPeers = 2
294         cfg.P2P.ListenAddress = "127.0.1.1:0"
295         swPrivKey, _ := chainkd.NewXPrv(nil)
296         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
297         s1.Start()
298         defer s1.Stop()
299
300         inpCfg := *testCfg
301         inpPrivKey, _ := chainkd.NewXPrv(nil)
302         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
303         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
304         if err != nil {
305                 t.Fatal(err)
306         }
307         go inp.dial(addr)
308
309         rpCfg := *testCfg
310         rpPrivKey, _ := chainkd.NewXPrv(nil)
311         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
312         rp.Start()
313         defer rp.Stop()
314
315         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
316                 t.Fatal(err)
317         }
318
319         inp2Cfg := *testCfg
320         inp2PrivKey, _ := chainkd.NewXPrv(nil)
321         inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
322
323         go inp2.dial(addr)
324
325         time.Sleep(1 * time.Second)
326         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
327                 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
328         }
329 }
330
331 func TestStopPeer(t *testing.T) {
332         t.Skip("due to fail on mac")
333         dirPath, err := ioutil.TempDir(".", "")
334         if err != nil {
335                 t.Fatal(err)
336         }
337         defer os.RemoveAll(dirPath)
338
339         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
340         cfg := *testCfg
341         cfg.DBPath = dirPath
342         cfg.P2P.MaxNumPeers = 2
343         cfg.P2P.ListenAddress = "127.0.1.1:0"
344         swPrivKey, _ := chainkd.NewXPrv(nil)
345         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
346         s1.Start()
347         defer s1.Stop()
348
349         inpCfg := *testCfg
350         inpPrivKey, _ := chainkd.NewXPrv(nil)
351         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
352         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
353         if err != nil {
354                 t.Fatal(err)
355         }
356         go inp.dial(addr)
357
358         rpCfg := *testCfg
359         rpPrivKey, _ := chainkd.NewXPrv(nil)
360         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
361         rp.Start()
362         defer rp.Stop()
363
364         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
365                 t.Fatal(err)
366         }
367         time.Sleep(1 * time.Second)
368         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
369                 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
370         }
371
372         s1.StopPeerGracefully(s1.peers.list[0].Key)
373         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
374                 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
375         }
376
377         s1.StopPeerForError(s1.peers.list[0], "stop for test")
378         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
379                 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
380         }
381 }