10 "github.com/davecgh/go-spew/spew"
12 cfg "github.com/bytom/bytom/config"
13 "github.com/bytom/bytom/crypto/ed25519/chainkd"
14 dbm "github.com/bytom/bytom/database/leveldb"
15 "github.com/bytom/bytom/errors"
16 conn "github.com/bytom/bytom/p2p/connection"
17 "github.com/bytom/bytom/p2p/security"
25 testCfg = cfg.DefaultConfig()
29 Each peer has one `MConnection` (multiplex connection) instance.
31 __multiplex__ *noun* a system or signal involving simultaneous transmission of
32 several messages along a single channel of communication.
34 Each `MConnection` handles message transmission on multiple abstract communication
35 `Channel`s. Each channel has a globally unique byte id.
36 The byte id and the relative priorities of each `Channel` are configured upon
37 initialization of the connection.
39 There are two methods for sending messages:
40 func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
41 func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
43 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
44 successfully queued for the channel with the given id byte `chID`, or until the
45 request times out. The message `msg` is serialized using Go-Amino.
47 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
48 channel's queue is full.
50 Inbound message bytes are handled with an onReceive callback function.
52 type PeerMessage struct {
58 type TestReactor struct {
62 channels []*conn.ChannelDescriptor
65 msgsReceived map[byte][]PeerMessage
68 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
71 logMessages: logMessages,
72 msgsReceived: make(map[byte][]PeerMessage),
74 tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
79 // GetChannels implements Reactor
80 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
84 // OnStart implements BaseService
85 func (tr *TestReactor) OnStart() error {
86 tr.BaseReactor.OnStart()
90 // OnStop implements BaseService
91 func (tr *TestReactor) OnStop() {
92 tr.BaseReactor.OnStop()
95 // AddPeer implements Reactor by sending our state to peer.
96 func (tr *TestReactor) AddPeer(peer *Peer) error {
100 // RemovePeer implements Reactor by removing peer from the pool.
101 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
104 // Receive implements Reactor by handling 4 types of messages (look below).
105 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
108 defer tr.mtx.Unlock()
109 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
114 func initSwitchFunc(sw *Switch) *Switch {
115 // Make two reactors of two channels each
116 sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
117 {ID: byte(0x00), Priority: 10},
118 {ID: byte(0x01), Priority: 10},
120 sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
121 {ID: byte(0x02), Priority: 10},
122 {ID: byte(0x03), Priority: 10},
129 func TestFiltersOutItself(t *testing.T) {
130 t.Skip("due to fail on mac")
131 dirPath, err := ioutil.TempDir(".", "")
135 defer os.RemoveAll(dirPath)
137 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
140 cfg.P2P.ListenAddress = "127.0.1.1:0"
141 swPrivKey, _ := chainkd.NewXPrv(nil)
142 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
146 rmdirPath, err := ioutil.TempDir(".", "")
150 defer os.RemoveAll(rmdirPath)
152 // simulate s1 having a public key and creating a remote peer with the same key
154 rpCfg.DBPath = rmdirPath
155 rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: &rpCfg}
158 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
162 //S1 dialing itself ip address
163 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
165 if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
170 func TestDialBannedPeer(t *testing.T) {
171 t.Skip("due to fail on mac")
172 dirPath, err := ioutil.TempDir(".", "")
176 defer os.RemoveAll(dirPath)
178 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
181 cfg.P2P.ListenAddress = "127.0.1.1:0"
182 swPrivKey, _ := chainkd.NewXPrv(nil)
183 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
187 rmdirPath, err := ioutil.TempDir(".", "")
191 defer os.RemoveAll(rmdirPath)
194 rpCfg.DBPath = rmdirPath
195 prvKey, _ := chainkd.NewXPrv(nil)
196 rp := &remotePeer{PrivKey: prvKey, Config: &rpCfg}
200 if ok := s1.security.IsBanned(rp.addr.IP.String(), security.LevelMsgIllegal, "test"); ok {
204 if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != security.ErrConnectBannedPeer {
209 func TestDuplicateOutBoundPeer(t *testing.T) {
210 t.Skip("due to fail on mac")
211 dirPath, err := ioutil.TempDir(".", "")
215 defer os.RemoveAll(dirPath)
217 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
220 cfg.P2P.ListenAddress = "127.0.1.1:0"
221 swPrivKey, _ := chainkd.NewXPrv(nil)
222 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
226 rmdirPath, err := ioutil.TempDir(".", "")
230 defer os.RemoveAll(rmdirPath)
233 prvKey, _ := chainkd.NewXPrv(nil)
234 rp := &remotePeer{PrivKey: prvKey, Config: &rpCfg}
238 if err = s1.DialPeerWithAddress(rp.addr); err != nil {
242 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
247 func TestDuplicateInBoundPeer(t *testing.T) {
248 t.Skip("due to fail on mac")
249 dirPath, err := ioutil.TempDir(".", "")
253 defer os.RemoveAll(dirPath)
255 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
258 cfg.P2P.ListenAddress = "127.0.1.1:0"
259 swPrivKey, _ := chainkd.NewXPrv(nil)
260 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
265 inp := &inboundPeer{PrivKey: swPrivKey, config: &inpCfg}
266 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
273 inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: &inp1Cfg}
276 time.Sleep(1 * time.Second)
277 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
278 t.Fatal("TestDuplicateInBoundPeer peer size error want 1, got:", outbound, inbound, dialing, spew.Sdump(s1.peers.lookup))
282 func TestAddInboundPeer(t *testing.T) {
283 t.Skip("due to fail on mac")
284 dirPath, err := ioutil.TempDir(".", "")
288 defer os.RemoveAll(dirPath)
290 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
293 cfg.P2P.MaxNumPeers = 2
294 cfg.P2P.ListenAddress = "127.0.1.1:0"
295 swPrivKey, _ := chainkd.NewXPrv(nil)
296 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
301 inpPrivKey, _ := chainkd.NewXPrv(nil)
302 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
303 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
310 rpPrivKey, _ := chainkd.NewXPrv(nil)
311 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
315 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
320 inp2PrivKey, _ := chainkd.NewXPrv(nil)
321 inp2 := &inboundPeer{PrivKey: inp2PrivKey, config: &inp2Cfg}
325 time.Sleep(1 * time.Second)
326 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
327 t.Fatal("TestAddInboundPeer peer size error want 2 got:", spew.Sdump(s1.peers.lookup))
331 func TestStopPeer(t *testing.T) {
332 t.Skip("due to fail on mac")
333 dirPath, err := ioutil.TempDir(".", "")
337 defer os.RemoveAll(dirPath)
339 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
342 cfg.P2P.MaxNumPeers = 2
343 cfg.P2P.ListenAddress = "127.0.1.1:0"
344 swPrivKey, _ := chainkd.NewXPrv(nil)
345 s1 := MakeSwitch(&cfg, testDB, swPrivKey, initSwitchFunc)
350 inpPrivKey, _ := chainkd.NewXPrv(nil)
351 inp := &inboundPeer{PrivKey: inpPrivKey, config: &inpCfg}
352 addr := NewNetAddress(s1.listeners[0].(*DefaultListener).NetListener().Addr())
359 rpPrivKey, _ := chainkd.NewXPrv(nil)
360 rp := &remotePeer{PrivKey: rpPrivKey, Config: &rpCfg}
364 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
367 time.Sleep(1 * time.Second)
368 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
369 t.Fatal("TestStopPeer peer size error want 2,got:", spew.Sdump(s1.peers.lookup))
372 s1.StopPeerGracefully(s1.peers.list[0].Key)
373 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
374 t.Fatal("TestStopPeer peer size error,want 1,got:", spew.Sdump(s1.peers.lookup))
377 s1.StopPeerForError(s1.peers.list[0], "stop for test")
378 if _, outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
379 t.Fatal("TestStopPeer peer size error,want 0, got:", spew.Sdump(s1.peers.lookup))