From: yahtoo Date: Fri, 2 Aug 2019 06:58:17 +0000 (+0800) Subject: improve net sync test cases (#375) X-Git-Tag: v1.0.5~61 X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=4adf3b94c3cf38f3ff3d77e95f6a4830f516d2d9 improve net sync test cases (#375) * Add netsync test case * Add netsync test case * Opz code format --- diff --git a/netsync/chainmgr/fast_sync.go b/netsync/chainmgr/fast_sync.go index 550d2dff..c2e7bc09 100644 --- a/netsync/chainmgr/fast_sync.go +++ b/netsync/chainmgr/fast_sync.go @@ -20,8 +20,10 @@ var ( fastSyncPivotGap = uint64(64) minGapStartFastSync = uint64(128) - errNoSyncPeer = errors.New("can't find sync peer") - errSkeletonSize = errors.New("fast sync skeleton size wrong") + errNoSyncPeer = errors.New("can't find sync peer") + errSkeletonSize = errors.New("fast sync skeleton size wrong") + errNoMainSkeleton = errors.New("No main skeleton found") + errNoSkeletonFound = errors.New("No skeleton found") ) type fastSync struct { @@ -83,12 +85,12 @@ func (fs *fastSync) createFetchBlocksTasks(stopBlock *types.Block) ([]*fetchBloc stopHash := stopBlock.Hash() skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1) if len(skeletonMap) == 0 { - return nil, errors.New("No skeleton found") + return nil, errNoSkeletonFound } mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()] if !ok { - return nil, errors.New("No main skeleton found") + return nil, errNoMainSkeleton } if len(mainSkeleton) < minSizeOfSyncSkeleton || len(mainSkeleton) > maxSizeOfSyncSkeleton { diff --git a/netsync/chainmgr/fast_sync_test.go b/netsync/chainmgr/fast_sync_test.go index ffacb56a..dcb71869 100644 --- a/netsync/chainmgr/fast_sync_test.go +++ b/netsync/chainmgr/fast_sync_test.go @@ -3,12 +3,15 @@ package chainmgr import ( "io/ioutil" "os" + "reflect" + "sync" "testing" "time" "github.com/vapor/consensus" dbm "github.com/vapor/database/leveldb" "github.com/vapor/errors" + "github.com/vapor/netsync/peers" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/test/mock" @@ -181,3 +184,175 @@ func TestFastBlockSync(t *testing.T) { } } } + +type mockFetcher struct { + baseChain []*types.Block + peerStatus map[string][]*types.Block + peers []string + testType int +} + +func (mf *mockFetcher) resetParameter() { + return +} + +func (mf *mockFetcher) addSyncPeer(peerID string) { + return +} + +func (mf *mockFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) { + return nil, nil +} + +func (mf *mockFetcher) parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) { + return +} + +func (mf *mockFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader { + result := make(map[string][]*types.BlockHeader) + switch mf.testType { + case 1: + result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader, + &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader, + &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader, + } + result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader, + &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader, + &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader, + } + + case 2: + result["peer1"] = []*types.BlockHeader{} + case 3: + case 4: + result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader, + &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader, + &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader, + } + case 5: + result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader, + &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader, + &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader, + } + result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader, + &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader, + &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, + } + } + return result +} + +func TestCreateFetchBlocksTasks(t *testing.T) { + baseChain := mockBlocks(nil, 1000) + chainX := append(baseChain, mockBlocks(baseChain[1000], 2000)...) + chainY := append(baseChain, mockBlocks(baseChain[1000], 1900)...) + peerStatus := make(map[string][]*types.Block) + peerStatus["peer1"] = chainX + peerStatus["peer2"] = chainY + type syncPeer struct { + peer *P2PPeer + bestHeight uint64 + irreversibleHeight uint64 + } + + cases := []struct { + peers []*syncPeer + mainSyncPeer string + testType int + wantTasks []*fetchBlocksWork + wantErr error + }{ + // normal test + { + peers: []*syncPeer{ + {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000}, + {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800}, + }, + mainSyncPeer: "peer1", + testType: 1, + wantTasks: []*fetchBlocksWork{ + {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader}, + {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader}, + {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader}, + {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader}, + }, + wantErr: nil, + }, + // test no sync peer + { + peers: []*syncPeer{}, + mainSyncPeer: "peer1", + testType: 0, + wantTasks: nil, + wantErr: errNoSyncPeer, + }, + // primary sync peer skeleton size error + { + peers: []*syncPeer{ + {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000}, + {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800}, + }, + mainSyncPeer: "peer1", + testType: 2, + wantTasks: nil, + wantErr: errSkeletonSize, + }, + // no skeleton return + { + peers: []*syncPeer{ + {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000}, + {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800}, + }, + mainSyncPeer: "peer1", + testType: 3, + wantTasks: nil, + wantErr: errNoSkeletonFound, + }, + // no main skeleton found + { + peers: []*syncPeer{ + {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000}, + {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800}, + }, + mainSyncPeer: "peer1", + testType: 4, + wantTasks: nil, + wantErr: errNoMainSkeleton, + }, + // skeleton length mismatch + { + peers: []*syncPeer{ + {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000}, + {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800}, + }, + mainSyncPeer: "peer1", + testType: 5, + wantTasks: []*fetchBlocksWork{ + {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader}, + {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader}, + {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader}, + {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader}, + }, + wantErr: nil, + }, + } + + for i, c := range cases { + peers := peers.NewPeerSet(NewPeerSet()) + for _, syncPeer := range c.peers { + peers.AddPeer(syncPeer.peer) + peers.SetStatus(syncPeer.peer.id, syncPeer.bestHeight, nil) + peers.SetIrreversibleStatus(syncPeer.peer.id, syncPeer.irreversibleHeight, nil) + } + mockChain := mock.NewChain(nil) + fs := newFastSync(mockChain, &mockFetcher{baseChain: baseChain, peerStatus: peerStatus, testType: c.testType}, nil, peers) + fs.mainSyncPeer = fs.peers.GetPeer(c.mainSyncPeer) + tasks, err := fs.createFetchBlocksTasks(baseChain[700]) + if err != c.wantErr { + t.Errorf("case %d: got %v want %v", i, err, c.wantErr) + } + if !reflect.DeepEqual(tasks, c.wantTasks) { + t.Errorf("case %d: got %v want %v", i, tasks, c.wantTasks) + } + } +} diff --git a/netsync/chainmgr/peers_test.go b/netsync/chainmgr/peers_test.go new file mode 100644 index 00000000..cc159fa4 --- /dev/null +++ b/netsync/chainmgr/peers_test.go @@ -0,0 +1,116 @@ +package chainmgr + +import ( + "testing" +) + +func TestAddDel(t *testing.T) { + syncPeers := newFastSyncPeers() + peers := make(map[string]bool) + peers["Peer1"] = true + peers["Peer2"] = true + for k := range peers { + syncPeers.add(k) + syncPeers.add(k) + } + if syncPeers.size() != len(peers) { + t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers)) + } + + syncPeers.delete("Peer1") + if syncPeers.size() != 1 { + t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1) + } + + syncPeers.delete("Peer1") + if syncPeers.size() != 1 { + t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1) + } +} + +func TestIdlePeers(t *testing.T) { + syncPeers := newFastSyncPeers() + peers := make(map[string]bool) + peers["Peer1"] = true + peers["Peer2"] = true + for k := range peers { + syncPeers.add(k) + syncPeers.add(k) + } + + idlePeers := syncPeers.selectIdlePeers() + if len(idlePeers) != len(peers) { + t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers)) + } + + for _, peer := range idlePeers { + if ok := peers[peer]; !ok { + t.Errorf("selcet idle peers test err: want peers %v got %v", peers, idlePeers) + } + } + + idlePeers = syncPeers.selectIdlePeers() + if len(idlePeers) != 0 { + t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0) + } + +} + +func TestIdlePeer(t *testing.T) { + syncPeers := newFastSyncPeers() + peers := make(map[string]bool) + peers["Peer1"] = true + peers["Peer2"] = true + for k := range peers { + syncPeers.add(k) + syncPeers.add(k) + } + idlePeer, err := syncPeers.selectIdlePeer() + if err != nil { + t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil) + } + + if ok := peers[idlePeer]; !ok { + t.Error("selcet idle peers test err.") + } + idlePeer, err = syncPeers.selectIdlePeer() + if err != nil { + t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil) + } + + if ok := peers[idlePeer]; !ok { + t.Error("selcet idle peers test err.") + } + idlePeer, err = syncPeers.selectIdlePeer() + if err != errNoValidFastSyncPeer { + t.Errorf("selcet idle peers test err: got %v\nwant %v", err, errNoValidFastSyncPeer) + } +} + +func TestSetIdle(t *testing.T) { + syncPeers := newFastSyncPeers() + peers := make(map[string]bool) + peers["Peer2"] = true + for k := range peers { + syncPeers.add(k) + } + if syncPeers.size() != len(peers) { + t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers)) + } + idlePeers := syncPeers.selectIdlePeers() + if len(idlePeers) != len(peers) { + t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers)) + } + + syncPeers.setIdle("Peer1") + idlePeers = syncPeers.selectIdlePeers() + if len(idlePeers) != 0 { + t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0) + } + + syncPeers.setIdle("Peer2") + idlePeers = syncPeers.selectIdlePeers() + if len(idlePeers) != len(peers) { + t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers)) + } +} diff --git a/netsync/chainmgr/tool_test.go b/netsync/chainmgr/tool_test.go index 4a3badd9..1905e11e 100644 --- a/netsync/chainmgr/tool_test.go +++ b/netsync/chainmgr/tool_test.go @@ -11,6 +11,7 @@ import ( dbm "github.com/vapor/database/leveldb" "github.com/vapor/consensus" + "github.com/vapor/event" "github.com/vapor/netsync/peers" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" @@ -197,11 +198,12 @@ func mockSync(blocks []*types.Block, mempool *mock.Mempool, fastSyncDB dbm.DB) * } return &Manager{ - chain: chain, - blockKeeper: newBlockKeeper(chain, peers, fastSyncDB), - peers: peers, - mempool: mempool, - txSyncCh: make(chan *txSyncMsg), + chain: chain, + blockKeeper: newBlockKeeper(chain, peers, fastSyncDB), + peers: peers, + mempool: mempool, + txSyncCh: make(chan *txSyncMsg), + eventDispatcher: event.NewDispatcher(), } } diff --git a/netsync/chainmgr/tx_keeper_test.go b/netsync/chainmgr/tx_keeper_test.go index dd269fde..b5ac9916 100644 --- a/netsync/chainmgr/tx_keeper_test.go +++ b/netsync/chainmgr/tx_keeper_test.go @@ -12,6 +12,7 @@ import ( "github.com/vapor/consensus" dbm "github.com/vapor/database/leveldb" "github.com/vapor/protocol" + core "github.com/vapor/protocol" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/test/mock" @@ -19,8 +20,11 @@ import ( const txsNumber = 2000 -func getTransactions() []*types.Tx { - txs := []*types.Tx{} +type mempool struct { +} + +func (m *mempool) GetTransactions() []*core.TxDesc { + txs := []*core.TxDesc{} for i := 0; i < txsNumber; i++ { txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51}) txInput.CommitmentSuffix = []byte{0, 1, 2} @@ -36,12 +40,13 @@ func getTransactions() []*types.Tx { Outputs: []*types.TxOutput{ types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}), }, + SerializedSize: 1000, }, Tx: &bc.Tx{ ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)}, }, } - txs = append(txs, tx) + txs = append(txs, &core.TxDesc{Tx: tx}) } return txs } @@ -58,7 +63,7 @@ func TestSyncMempool(t *testing.T) { blocks := mockBlocks(nil, 5) a := mockSync(blocks, &mock.Mempool{}, testDBA) b := mockSync(blocks, &mock.Mempool{}, testDBB) - + a.mempool = &mempool{} netWork := NewNetWork() netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode) netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode) @@ -71,8 +76,7 @@ func TestSyncMempool(t *testing.T) { go a.syncMempoolLoop() a.syncMempool("test node B") - wantTxs := getTransactions() - a.txSyncCh <- &txSyncMsg{"test node B", wantTxs} + wantTxs := a.mempool.GetTransactions() timeout := time.NewTimer(2 * time.Second) defer timeout.Stop() @@ -99,14 +103,82 @@ out: for i, gotTx := range gotTxs { index := gotTx.Tx.Inputs[0].Amount() - if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Inputs[0].Amount()) { - t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Inputs)) + if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) { + t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs)) + } + + if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) { + t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs)) + } + } +} + +func TestBroadcastTxsLoop(t *testing.T) { + tmpDir, err := ioutil.TempDir(".", "") + if err != nil { + t.Fatalf("failed to create temporary data folder: %v", err) + } + defer os.RemoveAll(tmpDir) + testDBA := dbm.NewDB("testdba", "leveldb", tmpDir) + testDBB := dbm.NewDB("testdbb", "leveldb", tmpDir) + + blocks := mockBlocks(nil, 5) + a := mockSync(blocks, &mock.Mempool{}, testDBA) + b := mockSync(blocks, &mock.Mempool{}, testDBB) + a.mempool = &mempool{} + netWork := NewNetWork() + netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode) + netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode) + if B2A, A2B, err := netWork.HandsShake(a, b); err != nil { + t.Errorf("fail on peer hands shake %v", err) + } else { + go B2A.postMan() + go A2B.postMan() + } + a.txMsgSub, err = a.eventDispatcher.Subscribe(core.TxMsgEvent{}) + if err != nil { + t.Fatal("txMsgSub subscribe err", err) + } + go a.broadcastTxsLoop() + wantTxs := a.mempool.GetTransactions() + txsNum := 50 + for i, txD := range wantTxs { + if i >= txsNum { + break } + a.eventDispatcher.Post(core.TxMsgEvent{TxMsg: &core.TxPoolMsg{TxDesc: txD, MsgType: core.MsgNewTx}}) + } + timeout := time.NewTimer(2 * time.Second) + defer timeout.Stop() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() - if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Outputs[0].AssetAmount()) { - t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Outputs)) + gotTxs := []*protocol.TxDesc{} + for { + select { + case <-ticker.C: + gotTxs = b.mempool.GetTransactions() + if len(gotTxs) >= txsNum { + goto out + } + case <-timeout.C: + t.Fatalf("mempool sync timeout") } + } +out: + if len(gotTxs) != txsNum { + t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber) } + for i, gotTx := range gotTxs { + index := gotTx.Tx.Inputs[0].Amount() + if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) { + t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs)) + } + + if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) { + t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs)) + } + } } diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go index 3c92b38b..8c28ff90 100644 --- a/netsync/consensusmgr/block_fetcher.go +++ b/netsync/consensusmgr/block_fetcher.go @@ -4,7 +4,6 @@ import ( "github.com/sirupsen/logrus" "gopkg.in/karalabe/cookiejar.v2/collections/prque" - "github.com/vapor/netsync/peers" "github.com/vapor/p2p/security" "github.com/vapor/protocol/bc" ) @@ -19,7 +18,7 @@ const ( // and scheduling them for retrieval. type blockFetcher struct { chain Chain - peers *peers.PeerSet + peers Peers newBlockCh chan *blockMsg queue *prque.Prque @@ -27,7 +26,7 @@ type blockFetcher struct { } //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose. -func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher { +func newBlockFetcher(chain Chain, peers Peers) *blockFetcher { f := &blockFetcher{ chain: chain, peers: peers, diff --git a/netsync/consensusmgr/consensus_msg.go b/netsync/consensusmgr/consensus_msg.go index 33fee3d1..f47cf81a 100644 --- a/netsync/consensusmgr/consensus_msg.go +++ b/netsync/consensusmgr/consensus_msg.go @@ -69,7 +69,7 @@ func (bs *BlockSignatureMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers [] // BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages. func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string { - return ps.PeersWithoutSign(bs.Signature) + return ps.PeersWithoutSignature(bs.Signature) } // BlockProposeMsg block propose message transferred between nodes. diff --git a/netsync/consensusmgr/handle.go b/netsync/consensusmgr/handle.go index 65fb9f29..37f7ee82 100644 --- a/netsync/consensusmgr/handle.go +++ b/netsync/consensusmgr/handle.go @@ -26,6 +26,17 @@ type Chain interface { ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error } +type Peers interface { + AddPeer(peer peers.BasePeer) + BroadcastMsg(bm peers.BroadcastMsg) error + GetPeer(id string) *peers.Peer + MarkBlock(peerID string, hash *bc.Hash) + MarkBlockSignature(peerID string, signature []byte) + ProcessIllegal(peerID string, level byte, reason string) + RemovePeer(peerID string) + SetStatus(peerID string, height uint64, hash *bc.Hash) +} + type blockMsg struct { block *types.Block peerID string @@ -35,7 +46,7 @@ type blockMsg struct { type Manager struct { sw Switch chain Chain - peers *peers.PeerSet + peers Peers blockFetcher *blockFetcher eventDispatcher *event.Dispatcher @@ -43,7 +54,7 @@ type Manager struct { } // NewManager create new manager. -func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager { +func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager { manager := &Manager{ sw: sw, chain: chain, diff --git a/netsync/consensusmgr/handle_test.go b/netsync/consensusmgr/handle_test.go new file mode 100644 index 00000000..d5e15692 --- /dev/null +++ b/netsync/consensusmgr/handle_test.go @@ -0,0 +1,231 @@ +package consensusmgr + +import ( + "math/rand" + "net" + "reflect" + "testing" + "time" + + "github.com/tendermint/tmlibs/flowrate" + + "github.com/vapor/consensus" + "github.com/vapor/event" + "github.com/vapor/netsync/peers" + "github.com/vapor/p2p" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +type p2peer struct { +} + +func (p *p2peer) Addr() net.Addr { + return nil +} + +func (p *p2peer) ID() string { + return "" +} + +func (p *p2peer) RemoteAddrHost() string { + return "" +} +func (p *p2peer) ServiceFlag() consensus.ServiceFlag { + return 0 +} +func (p *p2peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) { + return nil, nil +} +func (p *p2peer) TrySend(byte, interface{}) bool { + return true +} +func (p *p2peer) IsLAN() bool { + return false +} + +func mockBlocks(startBlock *types.Block, height uint64) []*types.Block { + blocks := []*types.Block{} + indexBlock := &types.Block{} + if startBlock == nil { + indexBlock = &types.Block{BlockHeader: types.BlockHeader{Version: uint64(rand.Uint32())}} + blocks = append(blocks, indexBlock) + } else { + indexBlock = startBlock + } + + for indexBlock.Height < height { + block := &types.Block{ + BlockHeader: types.BlockHeader{ + Height: indexBlock.Height + 1, + PreviousBlockHash: indexBlock.Hash(), + Version: uint64(rand.Uint32()), + }, + } + blocks = append(blocks, block) + indexBlock = block + } + return blocks +} + +type mockSW struct { +} + +func (s *mockSW) AddReactor(name string, reactor p2p.Reactor) p2p.Reactor { + return nil +} + +type mockChain struct { +} + +func (c *mockChain) BestBlockHeight() uint64 { + return 0 +} + +func (c *mockChain) GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) { + return nil, nil +} + +func (c *mockChain) ProcessBlock(*types.Block) (bool, error) { + return false, nil +} + +func (c *mockChain) ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error { + return nil +} + +type mockPeers struct { + msgCount *int + knownBlock *bc.Hash + blockHeight *uint64 + knownSignature *[]byte +} + +func newMockPeers(msgCount *int, knownBlock *bc.Hash, blockHeight *uint64, signature *[]byte) *mockPeers { + return &mockPeers{ + msgCount: msgCount, + knownBlock: knownBlock, + blockHeight: blockHeight, + knownSignature: signature, + } +} + +func (ps *mockPeers) AddPeer(peer peers.BasePeer) { + +} + +func (ps *mockPeers) BroadcastMsg(bm peers.BroadcastMsg) error { + *ps.msgCount++ + return nil +} +func (ps *mockPeers) GetPeer(id string) *peers.Peer { + return &peers.Peer{BasePeer: &p2peer{}} +} +func (ps *mockPeers) MarkBlock(peerID string, hash *bc.Hash) { + *ps.knownBlock = *hash +} + +func (ps *mockPeers) MarkBlockSignature(peerID string, signature []byte) { + *ps.knownSignature = append(*ps.knownSignature, signature...) +} + +func (ps *mockPeers) ProcessIllegal(peerID string, level byte, reason string) { + +} +func (p *mockPeers) RemovePeer(peerID string) { + +} +func (ps *mockPeers) SetStatus(peerID string, height uint64, hash *bc.Hash) { + *ps.blockHeight = height +} + +func TestBlockProposeMsgBroadcastLoop(t *testing.T) { + dispatcher := event.NewDispatcher() + msgCount := 0 + blockHeight := 100 + mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher) + blocks := mockBlocks(nil, uint64(blockHeight)) + + mgr.Start() + defer mgr.Stop() + time.Sleep(10 * time.Millisecond) + for _, block := range blocks { + mgr.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}) + } + time.Sleep(10 * time.Millisecond) + if msgCount != blockHeight+1 { + t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1) + } +} + +func TestBlockSignatureMsgBroadcastLoop(t *testing.T) { + dispatcher := event.NewDispatcher() + msgCount := 0 + blockHeight := 100 + mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher) + blocks := mockBlocks(nil, uint64(blockHeight)) + + mgr.Start() + defer mgr.Stop() + time.Sleep(10 * time.Millisecond) + for _, block := range blocks { + mgr.eventDispatcher.Post(event.BlockSignatureEvent{BlockHash: block.Hash(), Signature: []byte{0x1, 0x2}, XPub: []byte{0x011, 0x022}}) + } + time.Sleep(10 * time.Millisecond) + if msgCount != blockHeight+1 { + t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1) + } +} + +func TestProcessBlockProposeMsg(t *testing.T) { + dispatcher := event.NewDispatcher() + msgCount := 0 + var knownBlock bc.Hash + blockHeight := uint64(0) + peerID := "Peer1" + mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, &knownBlock, &blockHeight, nil), dispatcher) + block := &types.Block{ + BlockHeader: types.BlockHeader{ + Height: 100, + PreviousBlockHash: bc.NewHash([32]byte{0x1}), + Version: uint64(rand.Uint32()), + }, + } + msg, err := NewBlockProposeMsg(block) + if err != nil { + t.Fatal("create new block propose msg err", err) + } + + mgr.processMsg(peerID, 0, msg) + if knownBlock != block.Hash() { + t.Fatalf("mark propose block msg err. got:%d\n want:%d", knownBlock, block.Hash()) + } + + if blockHeight != block.Height { + t.Fatalf("set peer status err. got:%d\n want:%d", blockHeight, block.Height) + } +} + +func TestProcessBlockSignatureMsg(t *testing.T) { + dispatcher := event.NewDispatcher() + msgCount := 0 + knownSignature := []byte{} + peerID := "Peer1" + mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, &knownSignature), dispatcher) + block := &types.Block{ + BlockHeader: types.BlockHeader{ + Height: 100, + PreviousBlockHash: bc.NewHash([32]byte{0x1}), + Version: uint64(rand.Uint32()), + }, + } + + signature := []byte{0x01, 0x02} + msg := NewBlockSignatureMsg(block.Hash(), signature, []byte{0x03, 0x04}) + + mgr.processMsg(peerID, 0, msg) + + if !reflect.DeepEqual(knownSignature, signature) { + t.Fatalf("set peer status err. got:%d\n want:%d", knownSignature, signature) + } +} diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go index ef90812f..e0c52f53 100644 --- a/netsync/peers/peer.go +++ b/netsync/peers/peer.go @@ -252,32 +252,6 @@ func (p *Peer) markTransaction(hash *bc.Hash) { p.knownTxs.Add(hash.String()) } -func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string { - ps.mtx.RLock() - defer ps.mtx.RUnlock() - - var peers []string - for _, peer := range ps.peers { - if !peer.knownBlocks.Has(hash.String()) { - peers = append(peers, peer.ID()) - } - } - return peers -} - -func (ps *PeerSet) PeersWithoutSign(signature []byte) []string { - ps.mtx.RLock() - defer ps.mtx.RUnlock() - - var peers []string - for _, peer := range ps.peers { - if !peer.knownSignatures.Has(hex.EncodeToString(signature)) { - peers = append(peers, peer.ID()) - } - } - return peers -} - func (p *Peer) SendBlock(block *types.Block) (bool, error) { msg, err := msgs.NewBlockMessage(block) if err != nil { @@ -544,14 +518,6 @@ func (ps *PeerSet) BroadcastTx(tx *types.Tx) error { return nil } -func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) { - if errors.Root(err) == ErrPeerMisbehave { - ps.ProcessIllegal(peerID, level, err.Error()) - } else { - ps.RemovePeer(peerID) - } -} - // Peer retrieves the registered peer with the given id. func (ps *PeerSet) GetPeer(id string) *Peer { ps.mtx.RLock() @@ -618,14 +584,27 @@ func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) { peer.markTransaction(&txHash) } -func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer { +func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string { ps.mtx.RLock() defer ps.mtx.RUnlock() - peers := []*Peer{} + var peers []string for _, peer := range ps.peers { if !peer.knownBlocks.Has(hash.String()) { - peers = append(peers, peer) + peers = append(peers, peer.ID()) + } + } + return peers +} + +func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + var peers []string + for _, peer := range ps.peers { + if !peer.knownSignatures.Has(hex.EncodeToString(signature)) { + peers = append(peers, peer.ID()) } } return peers @@ -681,10 +660,3 @@ func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc. peer.SetIrreversibleStatus(height, hash) } - -func (ps *PeerSet) Size() int { - ps.mtx.RLock() - defer ps.mtx.RUnlock() - - return len(ps.peers) -} diff --git a/netsync/peers/peer_test.go b/netsync/peers/peer_test.go new file mode 100644 index 00000000..27a1625e --- /dev/null +++ b/netsync/peers/peer_test.go @@ -0,0 +1,417 @@ +package peers + +import ( + "net" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/tendermint/tmlibs/flowrate" + "github.com/vapor/consensus" + "github.com/vapor/p2p/security" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +var ( + peer1ID = "PEER1" + peer2ID = "PEER2" + peer3ID = "PEER3" + peer4ID = "PEER4" + + block1000Hash = bc.NewHash([32]byte{0x01, 0x02}) + block2000Hash = bc.NewHash([32]byte{0x02, 0x03}) + block3000Hash = bc.NewHash([32]byte{0x03, 0x04}) +) + +type basePeer struct { + id string + serviceFlag consensus.ServiceFlag + isLan bool +} + +func (bp *basePeer) Addr() net.Addr { + return nil +} + +func (bp *basePeer) ID() string { + return bp.id +} + +func (bp *basePeer) RemoteAddrHost() string { + switch bp.ID() { + case peer1ID: + return peer1ID + case peer2ID: + return peer2ID + case peer3ID: + return peer3ID + case peer4ID: + return peer4ID + } + return "" +} + +func (bp *basePeer) ServiceFlag() consensus.ServiceFlag { + return bp.serviceFlag +} + +func (bp *basePeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) { + return nil, nil +} + +func (bp *basePeer) TrySend(byte, interface{}) bool { + return true +} + +func (bp *basePeer) IsLAN() bool { + return bp.isLan +} + +func TestSetPeerStatus(t *testing.T) { + peer := newPeer(&basePeer{}) + height := uint64(100) + hash := bc.NewHash([32]byte{0x1, 0x2}) + peer.SetBestStatus(height, &hash) + if peer.Height() != height { + t.Fatalf("test set best status err. got %d want %d", peer.Height(), height) + } +} + +func TestSetIrreversibleStatus(t *testing.T) { + peer := newPeer(&basePeer{}) + height := uint64(100) + hash := bc.NewHash([32]byte{0x1, 0x2}) + peer.SetIrreversibleStatus(height, &hash) + if peer.IrreversibleHeight() != height { + t.Fatalf("test set Irreversible status err. got %d want %d", peer.Height(), height) + } +} + +func TestAddFilterAddresses(t *testing.T) { + peer := newPeer(&basePeer{}) + tx := types.NewTx(types.TxData{ + Inputs: []*types.TxInput{ + types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")), + }, + Outputs: []*types.TxOutput{ + types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")), + }, + }) + + peer.AddFilterAddresses([][]byte{[]byte("spendProgram")}) + if !peer.isRelatedTx(tx) { + t.Fatal("test filter addresses error.") + } + + peer.AddFilterAddresses([][]byte{[]byte("testProgram")}) + if peer.isRelatedTx(tx) { + t.Fatal("test filter addresses error.") + } +} + +func TestFilterClear(t *testing.T) { + peer := newPeer(&basePeer{}) + tx := types.NewTx(types.TxData{ + Inputs: []*types.TxInput{ + types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")), + }, + Outputs: []*types.TxOutput{ + types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")), + }, + }) + + peer.AddFilterAddresses([][]byte{[]byte("spendProgram")}) + if !peer.isRelatedTx(tx) { + t.Fatal("test filter addresses error.") + } + + peer.FilterClear() + if peer.isRelatedTx(tx) { + t.Fatal("test filter addresses error.") + } +} + +func TestGetRelatedTxAndStatus(t *testing.T) { + peer := newPeer(&basePeer{}) + txs := []*types.Tx{ + types.NewTx(types.TxData{ + Inputs: []*types.TxInput{ + types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram1")), + }, + Outputs: []*types.TxOutput{ + types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram1")), + }, + }), + types.NewTx(types.TxData{ + Inputs: []*types.TxInput{ + types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram2")), + }, + Outputs: []*types.TxOutput{ + types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram2")), + }, + }), + types.NewTx(types.TxData{ + Inputs: []*types.TxInput{ + types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram3")), + }, + Outputs: []*types.TxOutput{ + types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram3")), + }, + }), + } + txStatuses := &bc.TransactionStatus{ + VerifyStatus: []*bc.TxVerifyResult{{StatusFail: true}, {StatusFail: false}, {StatusFail: false}}, + } + peer.AddFilterAddresses([][]byte{[]byte("spendProgram1"), []byte("outProgram3")}) + gotTxs, gotStatus := peer.getRelatedTxAndStatus(txs, txStatuses) + if len(gotTxs) != 2 { + t.Error("TestGetRelatedTxAndStatus txs size error") + } + + if !reflect.DeepEqual(*gotTxs[0].Tx, *txs[0].Tx) { + t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[0].Tx), spew.Sdump(txs[0].Tx)) + } + + if !reflect.DeepEqual(*gotTxs[1].Tx, *txs[2].Tx) { + t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[1].Tx), spew.Sdump(txs[2].Tx)) + } + + if gotStatus[0].StatusFail != true || gotStatus[1].StatusFail != false { + t.Error("TestGetRelatedTxAndStatus txs status error") + } +} + +type basePeerSet struct { +} + +func (bp *basePeerSet) StopPeerGracefully(string) { + +} + +func (bp *basePeerSet) IsBanned(ip string, level byte, reason string) bool { + switch ip { + case peer1ID: + return true + case peer2ID: + return false + case peer3ID: + return true + case peer4ID: + return false + } + return false +} + +func TestMarkBlock(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + ps.AddPeer(&basePeer{id: peer3ID}) + + blockHash := bc.NewHash([32]byte{0x01, 0x02}) + ps.MarkBlock(peer1ID, &blockHash) + targetPeers := []string{peer2ID, peer3ID} + + peers := ps.PeersWithoutBlock(blockHash) + if len(peers) != len(targetPeers) { + t.Fatalf("test mark block err. Number of target peers %d got %d", 1, len(peers)) + } + + for _, targetPeer := range targetPeers { + flag := false + for _, gotPeer := range peers { + if gotPeer == targetPeer { + flag = true + break + } + } + if !flag { + t.Errorf("test mark block err. can't found target peer %s ", targetPeer) + } + } +} + +func TestMarkStatus(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + ps.AddPeer(&basePeer{id: peer3ID}) + + height := uint64(1024) + ps.MarkStatus(peer1ID, height) + targetPeers := []string{peer2ID, peer3ID} + + peers := ps.peersWithoutNewStatus(height) + if len(peers) != len(targetPeers) { + t.Fatalf("test mark status err. Number of target peers %d got %d", 1, len(peers)) + } + + for _, targetPeer := range targetPeers { + flag := false + for _, gotPeer := range peers { + if gotPeer.ID() == targetPeer { + flag = true + break + } + } + if !flag { + t.Errorf("test mark status err. can't found target peer %s ", targetPeer) + } + } +} + +func TestMarkBlockSignature(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + ps.AddPeer(&basePeer{id: peer3ID}) + + signature := []byte{0x01, 0x02} + ps.MarkBlockSignature(peer1ID, signature) + targetPeers := []string{peer2ID, peer3ID} + + peers := ps.PeersWithoutSignature(signature) + if len(peers) != len(targetPeers) { + t.Fatalf("test mark block signature err. Number of target peers %d got %d", 1, len(peers)) + } + + for _, targetPeer := range targetPeers { + flag := false + for _, gotPeer := range peers { + if gotPeer == targetPeer { + flag = true + break + } + } + if !flag { + t.Errorf("test mark block signature err. can't found target peer %s ", targetPeer) + } + } +} + +func TestMarkTx(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + ps.AddPeer(&basePeer{id: peer3ID}) + + txHash := bc.NewHash([32]byte{0x01, 0x02}) + ps.MarkTx(peer1ID, txHash) + peers := ps.peersWithoutTx(&txHash) + targetPeers := []string{peer2ID, peer3ID} + if len(peers) != len(targetPeers) { + t.Fatalf("test mark tx err. Number of target peers %d got %d", 1, len(peers)) + } + + for _, targetPeer := range targetPeers { + flag := false + for _, gotPeer := range peers { + if gotPeer.ID() == targetPeer { + flag = true + break + } + } + if !flag { + t.Errorf("test mark tx err. can't found target peer %s ", targetPeer) + } + } +} + +func TestSetStatus(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync}) + ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true}) + ps.SetStatus(peer1ID, 1000, &block1000Hash) + ps.SetStatus(peer2ID, 2000, &block2000Hash) + ps.SetStatus(peer3ID, 3000, &block3000Hash) + ps.SetStatus(peer4ID, 2000, &block2000Hash) + targetPeer := peer4ID + + peer := ps.BestPeer(consensus.SFFullNode) + + if peer.ID() != targetPeer { + t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID()) + } +} + +func TestIrreversibleStatus(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync}) + ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFastSync, isLan: true}) + ps.SetIrreversibleStatus(peer1ID, 1000, &block1000Hash) + ps.SetIrreversibleStatus(peer2ID, 2000, &block2000Hash) + ps.SetIrreversibleStatus(peer3ID, 3000, &block3000Hash) + ps.SetIrreversibleStatus(peer4ID, 3000, &block3000Hash) + targetPeer := peer4ID + peer := ps.BestIrreversiblePeer(consensus.SFFastSync) + + if peer.ID() != targetPeer { + t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID()) + } +} + +func TestGetPeersByHeight(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode}) + ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync}) + ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true}) + ps.SetStatus(peer1ID, 1000, &block1000Hash) + ps.SetStatus(peer2ID, 2000, &block2000Hash) + ps.SetStatus(peer3ID, 3000, &block3000Hash) + ps.SetStatus(peer4ID, 2000, &block2000Hash) + peers := ps.GetPeersByHeight(2000) + targetPeers := []string{peer2ID, peer3ID, peer4ID} + if len(peers) != len(targetPeers) { + t.Fatalf("test get peers by height err. Number of target peers %d got %d", 3, len(peers)) + } + + for _, targetPeer := range targetPeers { + flag := false + for _, gotPeer := range peers { + if gotPeer.ID() == targetPeer { + flag = true + break + } + } + if !flag { + t.Errorf("test get peers by height err. can't found target peer %s ", targetPeer) + } + } +} + +func TestRemovePeer(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + + ps.RemovePeer(peer1ID) + if peer := ps.GetPeer(peer1ID); peer != nil { + t.Fatalf("remove peer %s err", peer1ID) + } + + if peer := ps.GetPeer(peer2ID); peer == nil { + t.Fatalf("Error remove peer %s err", peer2ID) + } +} + +func TestProcessIllegal(t *testing.T) { + ps := NewPeerSet(&basePeerSet{}) + ps.AddPeer(&basePeer{id: peer1ID}) + ps.AddPeer(&basePeer{id: peer2ID}) + + ps.ProcessIllegal(peer1ID, security.LevelMsgIllegal, "test") + if peer := ps.GetPeer(peer1ID); peer != nil { + t.Fatalf("remove peer %s err", peer1ID) + } + + ps.ProcessIllegal(peer2ID, security.LevelMsgIllegal, "test") + if peer := ps.GetPeer(peer2ID); peer == nil { + t.Fatalf("Error remove peer %s err", peer2ID) + } +} diff --git a/netsync/sync_manager.go b/netsync/sync_manager.go index 95ffbb48..88cf9d41 100644 --- a/netsync/sync_manager.go +++ b/netsync/sync_manager.go @@ -67,7 +67,7 @@ func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protoc if err != nil { return nil, err } - consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers) + consensusMgr := consensusmgr.NewManager(sw, chain, peers, dispatcher) return &SyncManager{ config: config, sw: sw,