10 "github.com/vapor/consensus"
11 dbm "github.com/vapor/database/leveldb"
12 "github.com/vapor/errors"
13 msgs "github.com/vapor/netsync/messages"
14 "github.com/vapor/netsync/peers"
15 "github.com/vapor/protocol"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
18 "github.com/vapor/test/mock"
19 "github.com/vapor/testutil"
22 func TestCheckSyncType(t *testing.T) {
23 tmp, err := ioutil.TempDir(".", "")
25 t.Fatalf("failed to create temporary data folder: %v", err)
27 fastSyncDB := dbm.NewDB("testdb", "leveldb", tmp)
33 blocks := mockBlocks(nil, 50)
34 chain := mock.NewChain(nil)
35 chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
36 for _, block := range blocks {
37 chain.SetBlockByHeight(block.Height, block)
40 type syncPeer struct {
43 irreversibleHeight uint64
56 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 500},
57 {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 50, irreversibleHeight: 50},
59 syncType: fastSyncType,
63 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 100},
64 {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 500, irreversibleHeight: 50},
66 syncType: regularSyncType,
70 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 51, irreversibleHeight: 50},
72 syncType: regularSyncType,
76 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 30, irreversibleHeight: 30},
82 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode}, bestHeight: 1000, irreversibleHeight: 1000},
84 syncType: regularSyncType,
88 {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 50},
89 {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
91 syncType: fastSyncType,
95 for i, c := range cases {
96 peers := peers.NewPeerSet(NewPeerSet())
97 blockKeeper := newBlockKeeper(chain, peers, fastSyncDB)
98 for _, syncPeer := range c.peers {
99 blockKeeper.peers.AddPeer(syncPeer.peer)
100 blockKeeper.peers.SetStatus(syncPeer.peer.id, syncPeer.bestHeight, nil)
101 blockKeeper.peers.SetIrreversibleStatus(syncPeer.peer.id, syncPeer.irreversibleHeight, nil)
103 gotType := blockKeeper.checkSyncType()
104 if c.syncType != gotType {
105 t.Errorf("case %d: got %d want %d", i, gotType, c.syncType)
110 func TestRegularBlockSync(t *testing.T) {
111 baseChain := mockBlocks(nil, 50)
112 chainX := append(baseChain, mockBlocks(baseChain[50], 60)...)
113 chainY := append(baseChain, mockBlocks(baseChain[50], 70)...)
114 chainZ := append(baseChain, mockBlocks(baseChain[50], 200)...)
115 chainE := append(baseChain, mockErrorBlocks(baseChain[50], 200, 60)...)
118 syncTimeout time.Duration
119 aBlocks []*types.Block
120 bBlocks []*types.Block
125 syncTimeout: 30 * time.Second,
126 aBlocks: baseChain[:20],
127 bBlocks: baseChain[:50],
128 want: baseChain[:50],
132 syncTimeout: 30 * time.Second,
139 syncTimeout: 30 * time.Second,
140 aBlocks: chainX[:52],
141 bBlocks: chainY[:53],
146 syncTimeout: 30 * time.Second,
147 aBlocks: chainX[:52],
153 syncTimeout: 0 * time.Second,
154 aBlocks: chainX[:52],
157 err: errRequestTimeout,
160 syncTimeout: 30 * time.Second,
161 aBlocks: chainX[:52],
164 err: protocol.ErrBadStateRoot,
167 tmp, err := ioutil.TempDir(".", "")
169 t.Fatalf("failed to create temporary data folder: %v", err)
171 testDBA := dbm.NewDB("testdba", "leveldb", tmp)
172 testDBB := dbm.NewDB("testdbb", "leveldb", tmp)
179 for i, c := range cases {
180 a := mockSync(c.aBlocks, nil, testDBA)
181 b := mockSync(c.bBlocks, nil, testDBB)
182 netWork := NewNetWork()
183 netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
184 netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
185 if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
186 t.Errorf("fail on peer hands shake %v", err)
192 requireBlockTimeout = c.syncTimeout
193 a.blockKeeper.syncPeer = a.peers.GetPeer("test node B")
194 if err := a.blockKeeper.regularBlockSync(); errors.Root(err) != c.err {
195 t.Errorf("case %d: got %v want %v", i, err, c.err)
198 got := []*types.Block{}
199 for i := uint64(0); i <= a.chain.BestBlockHeight(); i++ {
200 block, err := a.chain.GetBlockByHeight(i)
202 t.Errorf("case %d got err %v", i, err)
204 got = append(got, block)
207 if !testutil.DeepEqual(got, c.want) {
208 t.Errorf("case %d: got %v want %v", i, got, c.want)
213 func TestRequireBlock(t *testing.T) {
214 tmp, err := ioutil.TempDir(".", "")
216 t.Fatalf("failed to create temporary data folder: %v", err)
218 testDBA := dbm.NewDB("testdba", "leveldb", tmp)
219 testDBB := dbm.NewDB("testdbb", "leveldb", tmp)
226 blocks := mockBlocks(nil, 5)
227 a := mockSync(blocks[:1], nil, testDBA)
228 b := mockSync(blocks[:5], nil, testDBB)
229 netWork := NewNetWork()
230 netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
231 netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
232 if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
233 t.Errorf("fail on peer hands shake %v", err)
239 a.blockKeeper.syncPeer = a.peers.GetPeer("test node B")
240 b.blockKeeper.syncPeer = b.peers.GetPeer("test node A")
242 syncTimeout time.Duration
249 syncTimeout: 30 * time.Second,
256 syncTimeout: 1 * time.Millisecond,
260 err: errRequestTimeout,
265 requireBlockTimeout = 20 * time.Second
268 for i, c := range cases {
269 requireBlockTimeout = c.syncTimeout
270 got, err := c.testNode.blockKeeper.msgFetcher.requireBlock(c.testNode.blockKeeper.syncPeer.ID(), c.requireHeight)
271 if !testutil.DeepEqual(got, c.want) {
272 t.Errorf("case %d: got %v want %v", i, got, c.want)
274 if errors.Root(err) != c.err {
275 t.Errorf("case %d: got %v want %v", i, err, c.err)
280 func TestSendMerkleBlock(t *testing.T) {
281 tmp, err := ioutil.TempDir(".", "")
283 t.Fatalf("failed to create temporary data folder: %v", err)
286 testDBA := dbm.NewDB("testdba", "leveldb", tmp)
287 testDBB := dbm.NewDB("testdbb", "leveldb", tmp)
300 relatedTxIndex: []int{0, 2, 5},
304 relatedTxIndex: []int{},
308 relatedTxIndex: []int{},
312 relatedTxIndex: []int{0, 1, 2, 3, 4},
316 relatedTxIndex: []int{1, 6, 3, 9, 10, 19},
320 for _, c := range cases {
321 blocks := mockBlocks(nil, 2)
322 targetBlock := blocks[1]
323 txs, bcTxs := mockTxs(c.txCount)
326 targetBlock.Transactions = txs
327 if targetBlock.TransactionsMerkleRoot, err = types.TxMerkleRoot(bcTxs); err != nil {
331 spvNode := mockSync(blocks, nil, testDBA)
332 blockHash := targetBlock.Hash()
333 var statusResult *bc.TransactionStatus
334 if statusResult, err = spvNode.chain.GetTransactionStatus(&blockHash); err != nil {
338 if targetBlock.TransactionStatusHash, err = types.TxStatusMerkleRoot(statusResult.VerifyStatus); err != nil {
342 fullNode := mockSync(blocks, nil, testDBB)
343 netWork := NewNetWork()
344 netWork.Register(spvNode, "192.168.0.1", "spv_node", consensus.SFFastSync)
345 netWork.Register(fullNode, "192.168.0.2", "full_node", consensus.DefaultServices)
348 if F2S, _, err = netWork.HandsShake(spvNode, fullNode); err != nil {
349 t.Errorf("fail on peer hands shake %v", err)
352 completed := make(chan error)
354 msgBytes := <-F2S.msgCh
355 _, msg, _ := decodeMessage(msgBytes)
356 switch m := msg.(type) {
357 case *msgs.MerkleBlockMessage:
358 var relatedTxIDs []*bc.Hash
359 for _, rawTx := range m.RawTxDatas {
361 if err := tx.UnmarshalText(rawTx); err != nil {
365 relatedTxIDs = append(relatedTxIDs, &tx.ID)
367 var txHashes []*bc.Hash
368 for _, hashByte := range m.TxHashes {
369 hash := bc.NewHash(hashByte)
370 txHashes = append(txHashes, &hash)
372 if ok := types.ValidateTxMerkleTreeProof(txHashes, m.Flags, relatedTxIDs, targetBlock.TransactionsMerkleRoot); !ok {
373 completed <- errors.New("validate tx fail")
376 var statusHashes []*bc.Hash
377 for _, statusByte := range m.StatusHashes {
378 hash := bc.NewHash(statusByte)
379 statusHashes = append(statusHashes, &hash)
381 var relatedStatuses []*bc.TxVerifyResult
382 for _, statusByte := range m.RawTxStatuses {
383 status := &bc.TxVerifyResult{}
384 err := json.Unmarshal(statusByte, status)
388 relatedStatuses = append(relatedStatuses, status)
390 if ok := types.ValidateStatusMerkleTreeProof(statusHashes, m.Flags, relatedStatuses, targetBlock.TransactionStatusHash); !ok {
391 completed <- errors.New("validate status fail")
398 spvPeer := fullNode.peers.GetPeer("spv_node")
399 for i := 0; i < len(c.relatedTxIndex); i++ {
400 spvPeer.AddFilterAddress(txs[c.relatedTxIndex[i]].Outputs[0].ControlProgram())
402 msg := &msgs.GetMerkleBlockMessage{RawHash: targetBlock.Hash().Byte32()}
403 fullNode.handleGetMerkleBlockMsg(spvPeer, msg)
404 if err := <-completed; err != nil {
410 func TestLocateBlocks(t *testing.T) {
411 maxNumOfBlocksPerMsg = 5
412 blocks := mockBlocks(nil, 100)
420 locator: []uint64{20},
421 stopHash: blocks[100].Hash(),
422 wantHeight: []uint64{20, 21, 22, 23, 24},
426 locator: []uint64{20},
427 stopHash: bc.NewHash([32]byte{0x01, 0x02}),
428 wantHeight: []uint64{},
429 wantErr: mock.ErrFoundHeaderByHash,
433 mockChain := mock.NewChain(nil)
434 bk := &blockKeeper{chain: mockChain}
435 for _, block := range blocks {
436 mockChain.SetBlockByHeight(block.Height, block)
439 for i, c := range cases {
440 locator := []*bc.Hash{}
441 for _, i := range c.locator {
442 hash := blocks[i].Hash()
443 locator = append(locator, &hash)
446 want := []*types.Block{}
447 for _, i := range c.wantHeight {
448 want = append(want, blocks[i])
451 got, err := bk.locateBlocks(locator, &c.stopHash)
452 if err != c.wantErr {
453 t.Errorf("case %d: got %v want err = %v", i, err, c.wantErr)
456 if !testutil.DeepEqual(got, want) {
457 t.Errorf("case %d: got %v want %v", i, got, want)
462 func TestLocateHeaders(t *testing.T) {
464 maxNumOfHeadersPerMsg = 1000
466 maxNumOfHeadersPerMsg = 10
467 blocks := mockBlocks(nil, 150)
468 blocksHash := []bc.Hash{}
469 for _, block := range blocks {
470 blocksHash = append(blocksHash, block.Hash())
483 locator: []uint64{90},
484 stopHash: &blocksHash[100],
486 wantHeight: []uint64{90, 91, 92, 93, 94, 95, 96, 97, 98, 99},
491 locator: []uint64{20},
492 stopHash: &blocksHash[24],
494 wantHeight: []uint64{20, 21, 22, 23, 24},
499 locator: []uint64{20},
500 stopHash: &blocksHash[20],
501 wantHeight: []uint64{20},
506 locator: []uint64{20},
507 stopHash: &blocksHash[120],
508 wantHeight: []uint64{},
509 err: mock.ErrFoundHeaderByHash,
513 locator: []uint64{120, 70},
514 stopHash: &blocksHash[78],
515 wantHeight: []uint64{70, 71, 72, 73, 74, 75, 76, 77, 78},
520 locator: []uint64{15},
521 stopHash: &blocksHash[10],
523 wantHeight: []uint64{},
528 locator: []uint64{15},
529 stopHash: &blocksHash[80],
531 wantHeight: []uint64{15, 26, 37, 48, 59, 70, 80},
536 locator: []uint64{0},
537 stopHash: &blocksHash[100],
539 wantHeight: []uint64{0, 10, 20, 30, 40, 50, 60, 70, 80, 90},
544 for i, c := range cases {
545 mockChain := mock.NewChain(nil)
546 bk := &blockKeeper{chain: mockChain}
547 for i := uint64(0); i <= c.chainHeight; i++ {
548 mockChain.SetBlockByHeight(i, blocks[i])
551 locator := []*bc.Hash{}
552 for _, i := range c.locator {
553 hash := blocks[i].Hash()
554 locator = append(locator, &hash)
557 want := []*types.BlockHeader{}
558 for _, i := range c.wantHeight {
559 want = append(want, &blocks[i].BlockHeader)
562 got, err := bk.locateHeaders(locator, c.stopHash, c.skip, maxNumOfHeadersPerMsg)
564 t.Errorf("case %d: got %v want err = %v", i, err, c.err)
566 if !testutil.DeepEqual(got, want) {
567 t.Errorf("case %d: got %v want %v", i, got, want)