OSDN Git Service

Validate tx unit test (#1587)
[bytom/bytom.git] / p2p / switch_test.go
1 package p2p
2
3 import (
4         "github.com/tendermint/go-crypto"
5         dbm "github.com/tendermint/tmlibs/db"
6         "io/ioutil"
7         "os"
8         "sync"
9         "testing"
10
11         cfg "github.com/bytom/config"
12         "github.com/bytom/errors"
13         conn "github.com/bytom/p2p/connection"
14 )
15
16 var (
17         testCfg *cfg.Config
18 )
19
20 func init() {
21         testCfg = cfg.DefaultConfig()
22 }
23
24 /*
25 Each peer has one `MConnection` (multiplex connection) instance.
26
27 __multiplex__ *noun* a system or signal involving simultaneous transmission of
28 several messages along a single channel of communication.
29
30 Each `MConnection` handles message transmission on multiple abstract communication
31 `Channel`s.  Each channel has a globally unique byte id.
32 The byte id and the relative priorities of each `Channel` are configured upon
33 initialization of the connection.
34
35 There are two methods for sending messages:
36         func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
37         func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
38
39 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
40 successfully queued for the channel with the given id byte `chID`, or until the
41 request times out.  The message `msg` is serialized using Go-Amino.
42
43 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
44 channel's queue is full.
45
46 Inbound message bytes are handled with an onReceive callback function.
47 */
48 type PeerMessage struct {
49         PeerID  string
50         Bytes   []byte
51         Counter int
52 }
53
54 type TestReactor struct {
55         BaseReactor
56
57         mtx          sync.Mutex
58         channels     []*conn.ChannelDescriptor
59         logMessages  bool
60         msgsCounter  int
61         msgsReceived map[byte][]PeerMessage
62 }
63
64 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
65         tr := &TestReactor{
66                 channels:     channels,
67                 logMessages:  logMessages,
68                 msgsReceived: make(map[byte][]PeerMessage),
69         }
70         tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
71
72         return tr
73 }
74
75 // GetChannels implements Reactor
76 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
77         return tr.channels
78 }
79
80 // OnStart implements BaseService
81 func (tr *TestReactor) OnStart() error {
82         tr.BaseReactor.OnStart()
83         return nil
84 }
85
86 // OnStop implements BaseService
87 func (tr *TestReactor) OnStop() {
88         tr.BaseReactor.OnStop()
89 }
90
91 // AddPeer implements Reactor by sending our state to peer.
92 func (tr *TestReactor) AddPeer(peer *Peer) error {
93         return nil
94 }
95
96 // RemovePeer implements Reactor by removing peer from the pool.
97 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
98 }
99
100 // Receive implements Reactor by handling 4 types of messages (look below).
101 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
102         if tr.logMessages {
103                 tr.mtx.Lock()
104                 defer tr.mtx.Unlock()
105                 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
106                 tr.msgsCounter++
107         }
108 }
109
110 func initSwitchFunc(sw *Switch) *Switch {
111         // Make two reactors of two channels each
112         sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
113                 {ID: byte(0x00), Priority: 10},
114                 {ID: byte(0x01), Priority: 10},
115         }, true))
116         sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
117                 {ID: byte(0x02), Priority: 10},
118                 {ID: byte(0x03), Priority: 10},
119         }, true))
120
121         return sw
122 }
123
124 //Test connect self.
125 func TestFiltersOutItself(t *testing.T) {
126         dirPath, err := ioutil.TempDir(".", "")
127         if err != nil {
128                 t.Fatal(err)
129         }
130         defer os.RemoveAll(dirPath)
131
132         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
133
134         s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
135         s1.Start()
136         defer s1.Stop()
137         // simulate s1 having a public key and creating a remote peer with the same key
138         rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: testCfg}
139         rp.Start()
140         defer rp.Stop()
141         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
142                 t.Fatal(err)
143         }
144
145         //S1 dialing itself ip address
146         addr, err := NewNetAddressString("0.0.0.0:46656")
147         if err != nil {
148                 t.Fatal(err)
149         }
150
151         if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
152                 t.Fatal(err)
153         }
154 }
155
156 func TestDialBannedPeer(t *testing.T) {
157         dirPath, err := ioutil.TempDir(".", "")
158         if err != nil {
159                 t.Fatal(err)
160         }
161         defer os.RemoveAll(dirPath)
162
163         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
164         s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
165         s1.Start()
166         defer s1.Stop()
167         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
168         rp.Start()
169         defer rp.Stop()
170         s1.AddBannedPeer(rp.addr.IP.String())
171         if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
172                 t.Fatal(err)
173         }
174
175         s1.delBannedPeer(rp.addr.IP.String())
176         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
177                 t.Fatal(err)
178         }
179 }
180
181 func TestDuplicateOutBoundPeer(t *testing.T) {
182         dirPath, err := ioutil.TempDir(".", "")
183         if err != nil {
184                 t.Fatal(err)
185         }
186         defer os.RemoveAll(dirPath)
187
188         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
189         s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
190         s1.Start()
191         defer s1.Stop()
192         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
193         rp.Start()
194         defer rp.Stop()
195         if err = s1.DialPeerWithAddress(rp.addr); err != nil {
196                 t.Fatal(err)
197         }
198
199         if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
200                 t.Fatal(err)
201         }
202 }
203
204 func TestDuplicateInBoundPeer(t *testing.T) {
205         dirPath, err := ioutil.TempDir(".", "")
206         if err != nil {
207                 t.Fatal(err)
208         }
209         defer os.RemoveAll(dirPath)
210
211         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
212         s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
213         s1.Start()
214         defer s1.Stop()
215
216         inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
217         addr, err := NewNetAddressString(s1.nodeInfo.ListenAddr)
218         if err != nil {
219                 t.Fatal(err)
220         }
221
222         if err = inp.dial(addr); err != nil {
223                 t.Fatal(err)
224         }
225
226         inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: testCfg}
227
228         if err = inp1.dial(addr); err != nil {
229                 t.Fatal(err)
230         }
231
232         if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
233                 t.Fatal("TestDuplicateInBoundPeer peer size error", outbound, inbound, dialing)
234         }
235 }
236
237 func TestAddInboundPeer(t *testing.T) {
238         dirPath, err := ioutil.TempDir(".", "")
239         if err != nil {
240                 t.Fatal(err)
241         }
242         defer os.RemoveAll(dirPath)
243
244         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
245         cfg := *testCfg
246         cfg.P2P.MaxNumPeers = 2
247         s1 := MakeSwitch(&cfg, testDB, initSwitchFunc)
248         s1.Start()
249         defer s1.Stop()
250
251         inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
252         addr, err := NewNetAddressString(s1.nodeInfo.ListenAddr)
253         if err != nil {
254                 t.Fatal(err)
255         }
256
257         if err := inp.dial(addr); err != nil {
258                 t.Fatal(err)
259         }
260
261         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
262         rp.Start()
263         defer rp.Stop()
264         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
265                 t.Fatal(err)
266         }
267
268         if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
269                 t.Fatal("TestAddInboundPeer peer size error")
270         }
271         inp2 := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
272
273         if err := inp2.dial(addr); err == nil {
274                 t.Fatal("TestAddInboundPeer MaxNumPeers limit error")
275         }
276 }
277
278 func TestStopPeer(t *testing.T) {
279         dirPath, err := ioutil.TempDir(".", "")
280         if err != nil {
281                 t.Fatal(err)
282         }
283         defer os.RemoveAll(dirPath)
284
285         testDB := dbm.NewDB("testdb", "leveldb", dirPath)
286         cfg := *testCfg
287         cfg.P2P.MaxNumPeers = 2
288         s1 := MakeSwitch(&cfg, testDB, initSwitchFunc)
289         s1.Start()
290         defer s1.Stop()
291
292         inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
293         addr, err := NewNetAddressString("127.0.0.1:46656")
294         if err != nil {
295                 t.Fatal(err)
296         }
297
298         if err := inp.dial(addr); err != nil {
299                 t.Fatal(err)
300         }
301
302         rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
303         rp.Start()
304         defer rp.Stop()
305         if err := s1.DialPeerWithAddress(rp.addr); err != nil {
306                 t.Fatal(err)
307         }
308
309         if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
310                 t.Fatal("TestStopPeer peer size error")
311         }
312
313         s1.StopPeerGracefully(s1.peers.list[0].Key)
314         if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
315                 t.Fatal("TestStopPeer peer size error")
316         }
317
318         s1.StopPeerForError(s1.peers.list[0], "stop for test")
319         if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
320                 t.Fatal("TestStopPeer peer size error")
321         }
322 }