OSDN Git Service

add init consensus node as fed node (#97)
[bytom/vapor.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "io/ioutil"
5         "os"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11         "github.com/tendermint/go-crypto"
12
13         cfg "github.com/vapor/config"
14         dbm "github.com/vapor/database/leveldb"
15         "github.com/vapor/errors"
16         conn "github.com/vapor/p2p/connection"
17 )
18
19 var (
20         testCfg *cfg.Config
21 )
22
23 func init() {
24         testCfg = cfg.DefaultConfig()
25 }
26
27 /*
28 Each peer has one `MConnection` (multiplex connection) instance.
29
30 __multiplex__ *noun* a system or signal involving simultaneous transmission of
31 several messages along a single channel of communication.
32
33 Each `MConnection` handles message transmission on multiple abstract communication
34 `Channel`s.  Each channel has a globally unique byte id.
35 The byte id and the relative priorities of each `Channel` are configured upon
36 initialization of the connection.
37
38 There are two methods for sending messages:
39         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
40         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
41
42 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
43 successfully queued for the channel with the given id byte `chID`, or until the
44 request times out.  The message `msg` is serialized using Go-Amino.
45
46 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
47 channel's queue is full.
48
49 Inbound message bytes are handled with an onReceive callback function.
50 */
51 type PeerMessage struct {
52         PeerID  string
53         Bytes   []byte
54         Counter int
55 }
56
57 type TestReactor struct {
58         BaseReactor
59
60         mtx          sync.Mutex
61         channels     []*conn.ChannelDescriptor
62         logMessages  bool
63         msgsCounter  int
64         msgsReceived map[byte][]PeerMessage
65 }
66
67 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
68         tr := &TestReactor{
69                 channels:     channels,
70                 logMessages:  logMessages,
71                 msgsReceived: make(map[byte][]PeerMessage),
72         }
73         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
74
75         return tr
76 }
77
78 // GetChannels implements Reactor
79 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
80         return tr.channels
81 }
82
83 // OnStart implements BaseService
84 func (tr *TestReactor) OnStart() error {
85         tr.BaseReactor.OnStart()
86         return nil
87 }
88
89 // OnStop implements BaseService
90 func (tr *TestReactor) OnStop() {
91         tr.BaseReactor.OnStop()
92 }
93
94 // AddPeer implements Reactor by sending our state to peer.
95 func (tr *TestReactor) AddPeer(peer *Peer) error {
96         return nil
97 }
98
99 // RemovePeer implements Reactor by removing peer from the pool.
100 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
101 }
102
103 // Receive implements Reactor by handling 4 types of messages (look below).
104 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
105         if tr.logMessages {
106                 tr.mtx.Lock()
107                 defer tr.mtx.Unlock()
108                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
109                 tr.msgsCounter++
110         }
111 }
112
113 func initSwitchFunc(sw *Switch) *Switch {
114         // Make two reactors of two channels each
115         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
116                 {ID: byte(0x00), Priority: 10},
117                 {ID: byte(0x01), Priority: 10},
118         }, true))
119         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
120                 {ID: byte(0x02), Priority: 10},
121                 {ID: byte(0x03), Priority: 10},
122         }, true))
123
124         return sw
125 }
126
127 //Test connect self.
128 func TestFiltersOutItself(t *testing.T) {
129         t.Skip("skipping test")
130         dirPath, err := ioutil.TempDir(".", "")
131         if err != nil {
132                 t.Fatal(err)
133         }
134         defer os.RemoveAll(dirPath)
135
136         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
137         cfg := *testCfg
138         cfg.P2P.ListenAddress = "127.0.1.1:0"
139         swPrivKey := crypto.GenPrivKeyEd25519()
140         //cfg.P2P.PrivateKey = swPrivKey.String()
141         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
142         s1.Start()
143         defer s1.Stop()
144
145         // simulate s1 having a public key and creating a remote peer with the same key
146         rpCfg := *testCfg
147         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
148         rp.Start()
149         defer rp.Stop()
150         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
151                 t.Fatal(err)
152         }
153
154         //S1 dialing itself ip address
155         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
156
157         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
158                 t.Fatal(err)
159         }
160 }
161
162 func TestDialBannedPeer(t *testing.T) {
163         t.Skip("skipping test")
164         dirPath, err := ioutil.TempDir(".", "")
165         if err != nil {
166                 t.Fatal(err)
167         }
168         defer os.RemoveAll(dirPath)
169
170         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
171         cfg := *testCfg
172         cfg.P2P.ListenAddress = "127.0.1.1:0"
173         swPrivKey := crypto.GenPrivKeyEd25519()
174         //cfg.P2P.PrivateKey = swPrivKey.String()
175         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
176         s1.Start()
177         defer s1.Stop()
178
179         rpCfg := *testCfg
180         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: &rpCfg}
181         rp.Start()
182         defer rp.Stop()
183         s1.AddBannedPeer(rp.addr.IP.String())
184         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
185                 t.Fatal(err)
186         }
187
188         s1.delBannedPeer(rp.addr.IP.String())
189         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
190                 t.Fatal(err)
191         }
192 }
193
194 func TestDuplicateOutBoundPeer(t *testing.T) {
195         t.Skip("skipping test")
196         dirPath, err := ioutil.TempDir(".", "")
197         if err != nil {
198                 t.Fatal(err)
199         }
200         defer os.RemoveAll(dirPath)
201
202         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
203         cfg := *testCfg
204         cfg.P2P.ListenAddress = "127.0.1.1:0"
205         swPrivKey := crypto.GenPrivKeyEd25519()
206         //cfg.P2P.PrivateKey = swPrivKey.String()
207         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
208         s1.Start()
209         defer s1.Stop()
210
211         rpCfg := *testCfg
212         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: &rpCfg}
213         rp.Start()
214         defer rp.Stop()
215
216         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
217                 t.Fatal(err)
218         }
219
220         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
221                 t.Fatal(err)
222         }
223 }
224
225 func TestDuplicateInBoundPeer(t *testing.T) {
226         t.Skip("skipping test")
227         dirPath, err := ioutil.TempDir(".", "")
228         if err != nil {
229                 t.Fatal(err)
230         }
231         defer os.RemoveAll(dirPath)
232
233         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
234         cfg := *testCfg
235         cfg.P2P.ListenAddress = "127.0.1.1:0"
236         swPrivKey := crypto.GenPrivKeyEd25519()
237         //cfg.P2P.PrivateKey = swPrivKey.String()
238         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
239         s1.Start()
240         defer s1.Stop()
241
242         inpCfg := *testCfg
243         inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: &inpCfg}
244         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
245         if err != nil {
246                 t.Fatal(err)
247         }
248         go inp.dial(addr)
249
250         inp1Cfg := *testCfg
251         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
252         go inp1.dial(addr)
253
254         time.Sleep(1 * time.Second)
255         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
256                 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
257         }
258 }
259
260 func TestAddInboundPeer(t *testing.T) {
261         t.Skip("skipping test")
262         dirPath, err := ioutil.TempDir(".", "")
263         if err != nil {
264                 t.Fatal(err)
265         }
266         defer os.RemoveAll(dirPath)
267
268         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
269         cfg := *testCfg
270         cfg.P2P.MaxNumPeers = 2
271         cfg.P2P.ListenAddress = "127.0.1.1:0"
272         swPrivKey := crypto.GenPrivKeyEd25519()
273         //cfg.P2P.PrivateKey = swPrivKey.String()
274         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
275         s1.Start()
276         defer s1.Stop()
277
278         inpCfg := *testCfg
279         inpPrivKey := crypto.GenPrivKeyEd25519()
280         //inpCfg.P2P.PrivateKey = inpPrivKey.String()
281         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
282         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
283         if err != nil {
284                 t.Fatal(err)
285         }
286         go inp.dial(addr)
287
288         rpCfg := *testCfg
289         rpPrivKey := crypto.GenPrivKeyEd25519()
290         //rpCfg.P2P.PrivateKey = rpPrivKey.String()
291         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
292         rp.Start()
293         defer rp.Stop()
294
295         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
296                 t.Fatal(err)
297         }
298
299         inp2Cfg := *testCfg
300         inp2PrivKey := crypto.GenPrivKeyEd25519()
301         //inp2Cfg.P2P.PrivateKey = inp2PrivKey.String()
302         inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
303
304         go inp2.dial(addr)
305
306         time.Sleep(1 * time.Second)
307         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
308                 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
309         }
310 }
311
312 func TestStopPeer(t *testing.T) {
313         t.Skip("skipping test")
314         dirPath, err := ioutil.TempDir(".", "")
315         if err != nil {
316                 t.Fatal(err)
317         }
318         defer os.RemoveAll(dirPath)
319
320         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
321         cfg := *testCfg
322         cfg.P2P.MaxNumPeers = 2
323         cfg.P2P.ListenAddress = "127.0.1.1:0"
324         swPrivKey := crypto.GenPrivKeyEd25519()
325         //cfg.P2P.PrivateKey = swPrivKey.String()
326         s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
327         s1.Start()
328         defer s1.Stop()
329
330         inpCfg := *testCfg
331         inpPrivKey := crypto.GenPrivKeyEd25519()
332         //inpCfg.P2P.PrivateKey = inpPrivKey.String()
333         inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
334         addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
335         if err != nil {
336                 t.Fatal(err)
337         }
338         go inp.dial(addr)
339
340         rpCfg := *testCfg
341         rpPrivKey := crypto.GenPrivKeyEd25519()
342         //rpCfg.P2P.PrivateKey = rpPrivKey.String()
343         rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
344         rp.Start()
345         defer rp.Stop()
346
347         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
348                 t.Fatal(err)
349         }
350         time.Sleep(1 * time.Second)
351         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
352                 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
353         }
354
355         s1.StopPeerGracefully(s1.peers.list[0].Key)
356         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
357                 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
358         }
359
360         s1.StopPeerForError(s1.peers.list[0], "stop for test")
361         if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
362                 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))
363         }
364 }