4 "github.com/tendermint/go-crypto"
5 dbm "github.com/tendermint/tmlibs/db"
11 cfg "github.com/bytom/config"
12 "github.com/bytom/errors"
13 conn "github.com/bytom/p2p/connection"
21 testCfg = cfg.DefaultConfig()
25 Each peer has one `MConnection` (multiplex connection) instance.
27 __multiplex__ *noun* a system or signal involving simultaneous transmission of
28 several messages along a single channel of communication.
30 Each `MConnection` handles message transmission on multiple abstract communication
31 `Channel`s. Each channel has a globally unique byte id.
32 The byte id and the relative priorities of each `Channel` are configured upon
33 initialization of the connection.
35 There are two methods for sending messages:
36 func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
37 func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
39 `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
40 successfully queued for the channel with the given id byte `chID`, or until the
41 request times out. The message `msg` is serialized using Go-Amino.
43 `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
44 channel's queue is full.
46 Inbound message bytes are handled with an onReceive callback function.
48 type PeerMessage struct {
54 type TestReactor struct {
58 channels []*conn.ChannelDescriptor
61 msgsReceived map[byte][]PeerMessage
64 func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
67 logMessages: logMessages,
68 msgsReceived: make(map[byte][]PeerMessage),
70 tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
75 // GetChannels implements Reactor
76 func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
80 // OnStart implements BaseService
81 func (tr *TestReactor) OnStart() error {
82 tr.BaseReactor.OnStart()
86 // OnStop implements BaseService
87 func (tr *TestReactor) OnStop() {
88 tr.BaseReactor.OnStop()
91 // AddPeer implements Reactor by sending our state to peer.
92 func (tr *TestReactor) AddPeer(peer *Peer) error {
96 // RemovePeer implements Reactor by removing peer from the pool.
97 func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
100 // Receive implements Reactor by handling 4 types of messages (look below).
101 func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
104 defer tr.mtx.Unlock()
105 tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
110 func initSwitchFunc(sw *Switch) *Switch {
111 // Make two reactors of two channels each
112 sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
113 {ID: byte(0x00), Priority: 10},
114 {ID: byte(0x01), Priority: 10},
116 sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
117 {ID: byte(0x02), Priority: 10},
118 {ID: byte(0x03), Priority: 10},
125 func TestFiltersOutItself(t *testing.T) {
126 dirPath, err := ioutil.TempDir(".", "")
130 defer os.RemoveAll(dirPath)
132 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
134 s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
137 // simulate s1 having a public key and creating a remote peer with the same key
138 rp := &remotePeer{PrivKey: s1.nodePrivKey, Config: testCfg}
141 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectSelf {
145 //S1 dialing itself ip address
146 addr, err := NewNetAddressString("0.0.0.0:46656")
151 if err := s1.DialPeerWithAddress(addr); errors.Root(err) != ErrConnectSelf {
156 func TestDialBannedPeer(t *testing.T) {
157 dirPath, err := ioutil.TempDir(".", "")
161 defer os.RemoveAll(dirPath)
163 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
164 s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
167 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
170 s1.AddBannedPeer(rp.addr.IP.String())
171 if err := s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrConnectBannedPeer {
175 s1.delBannedPeer(rp.addr.IP.String())
176 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
181 func TestDuplicateOutBoundPeer(t *testing.T) {
182 dirPath, err := ioutil.TempDir(".", "")
186 defer os.RemoveAll(dirPath)
188 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
189 s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
192 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
195 if err = s1.DialPeerWithAddress(rp.addr); err != nil {
199 if err = s1.DialPeerWithAddress(rp.addr); errors.Root(err) != ErrDuplicatePeer {
204 func TestDuplicateInBoundPeer(t *testing.T) {
205 dirPath, err := ioutil.TempDir(".", "")
209 defer os.RemoveAll(dirPath)
211 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
212 s1 := MakeSwitch(testCfg, testDB, initSwitchFunc)
216 inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
217 addr, err := NewNetAddressString(s1.nodeInfo.ListenAddr)
222 if err = inp.dial(addr); err != nil {
226 inp1 := &inboundPeer{PrivKey: inp.PrivKey, config: testCfg}
228 if err = inp1.dial(addr); err != nil {
232 if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
233 t.Fatal("TestDuplicateInBoundPeer peer size error", outbound, inbound, dialing)
237 func TestAddInboundPeer(t *testing.T) {
238 dirPath, err := ioutil.TempDir(".", "")
242 defer os.RemoveAll(dirPath)
244 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
246 cfg.P2P.MaxNumPeers = 2
247 s1 := MakeSwitch(&cfg, testDB, initSwitchFunc)
251 inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
252 addr, err := NewNetAddressString(s1.nodeInfo.ListenAddr)
257 if err := inp.dial(addr); err != nil {
261 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
264 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
268 if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
269 t.Fatal("TestAddInboundPeer peer size error")
271 inp2 := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
273 if err := inp2.dial(addr); err == nil {
274 t.Fatal("TestAddInboundPeer MaxNumPeers limit error")
278 func TestStopPeer(t *testing.T) {
279 dirPath, err := ioutil.TempDir(".", "")
283 defer os.RemoveAll(dirPath)
285 testDB := dbm.NewDB("testdb", "leveldb", dirPath)
287 cfg.P2P.MaxNumPeers = 2
288 s1 := MakeSwitch(&cfg, testDB, initSwitchFunc)
292 inp := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
293 addr, err := NewNetAddressString("127.0.0.1:46656")
298 if err := inp.dial(addr); err != nil {
302 rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: testCfg}
305 if err := s1.DialPeerWithAddress(rp.addr); err != nil {
309 if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
310 t.Fatal("TestStopPeer peer size error")
313 s1.StopPeerGracefully(s1.peers.list[0].Key)
314 if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
315 t.Fatal("TestStopPeer peer size error")
318 s1.StopPeerForError(s1.peers.list[0], "stop for test")
319 if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
320 t.Fatal("TestStopPeer peer size error")