OSDN Git Service

improve net sync test cases (#375)
[bytom/vapor.git] / netsync / chainmgr / tx_keeper_test.go
1 package chainmgr
2
3 import (
4         "io/ioutil"
5         "os"
6         "reflect"
7         "testing"
8         "time"
9
10         "github.com/davecgh/go-spew/spew"
11
12         "github.com/vapor/consensus"
13         dbm "github.com/vapor/database/leveldb"
14         "github.com/vapor/protocol"
15         core "github.com/vapor/protocol"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18         "github.com/vapor/test/mock"
19 )
20
21 const txsNumber = 2000
22
23 type mempool struct {
24 }
25
26 func (m *mempool) GetTransactions() []*core.TxDesc {
27         txs := []*core.TxDesc{}
28         for i := 0; i < txsNumber; i++ {
29                 txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51})
30                 txInput.CommitmentSuffix = []byte{0, 1, 2}
31                 txInput.WitnessSuffix = []byte{0, 1, 2}
32
33                 tx := &types.Tx{
34
35                         TxData: types.TxData{
36                                 //SerializedSize: uint64(i * 10),
37                                 Inputs: []*types.TxInput{
38                                         txInput,
39                                 },
40                                 Outputs: []*types.TxOutput{
41                                         types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}),
42                                 },
43                                 SerializedSize: 1000,
44                         },
45                         Tx: &bc.Tx{
46                                 ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)},
47                         },
48                 }
49                 txs = append(txs, &core.TxDesc{Tx: tx})
50         }
51         return txs
52 }
53
54 func TestSyncMempool(t *testing.T) {
55         tmpDir, err := ioutil.TempDir(".", "")
56         if err != nil {
57                 t.Fatalf("failed to create temporary data folder: %v", err)
58         }
59         defer os.RemoveAll(tmpDir)
60         testDBA := dbm.NewDB("testdba", "leveldb", tmpDir)
61         testDBB := dbm.NewDB("testdbb", "leveldb", tmpDir)
62
63         blocks := mockBlocks(nil, 5)
64         a := mockSync(blocks, &mock.Mempool{}, testDBA)
65         b := mockSync(blocks, &mock.Mempool{}, testDBB)
66         a.mempool = &mempool{}
67         netWork := NewNetWork()
68         netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
69         netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
70         if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
71                 t.Errorf("fail on peer hands shake %v", err)
72         } else {
73                 go B2A.postMan()
74                 go A2B.postMan()
75         }
76
77         go a.syncMempoolLoop()
78         a.syncMempool("test node B")
79         wantTxs := a.mempool.GetTransactions()
80
81         timeout := time.NewTimer(2 * time.Second)
82         defer timeout.Stop()
83         ticker := time.NewTicker(500 * time.Millisecond)
84         defer ticker.Stop()
85
86         gotTxs := []*protocol.TxDesc{}
87         for {
88                 select {
89                 case <-ticker.C:
90                         gotTxs = b.mempool.GetTransactions()
91                         if len(gotTxs) >= txsNumber {
92                                 goto out
93                         }
94                 case <-timeout.C:
95                         t.Fatalf("mempool sync timeout")
96                 }
97         }
98
99 out:
100         if len(gotTxs) != txsNumber {
101                 t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
102         }
103
104         for i, gotTx := range gotTxs {
105                 index := gotTx.Tx.Inputs[0].Amount()
106                 if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
107                         t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
108                 }
109
110                 if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
111                         t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
112                 }
113         }
114 }
115
116 func TestBroadcastTxsLoop(t *testing.T) {
117         tmpDir, err := ioutil.TempDir(".", "")
118         if err != nil {
119                 t.Fatalf("failed to create temporary data folder: %v", err)
120         }
121         defer os.RemoveAll(tmpDir)
122         testDBA := dbm.NewDB("testdba", "leveldb", tmpDir)
123         testDBB := dbm.NewDB("testdbb", "leveldb", tmpDir)
124
125         blocks := mockBlocks(nil, 5)
126         a := mockSync(blocks, &mock.Mempool{}, testDBA)
127         b := mockSync(blocks, &mock.Mempool{}, testDBB)
128         a.mempool = &mempool{}
129         netWork := NewNetWork()
130         netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
131         netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
132         if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
133                 t.Errorf("fail on peer hands shake %v", err)
134         } else {
135                 go B2A.postMan()
136                 go A2B.postMan()
137         }
138         a.txMsgSub, err = a.eventDispatcher.Subscribe(core.TxMsgEvent{})
139         if err != nil {
140                 t.Fatal("txMsgSub subscribe err", err)
141         }
142         go a.broadcastTxsLoop()
143         wantTxs := a.mempool.GetTransactions()
144         txsNum := 50
145         for i, txD := range wantTxs {
146                 if i >= txsNum {
147                         break
148                 }
149                 a.eventDispatcher.Post(core.TxMsgEvent{TxMsg: &core.TxPoolMsg{TxDesc: txD, MsgType: core.MsgNewTx}})
150         }
151         timeout := time.NewTimer(2 * time.Second)
152         defer timeout.Stop()
153         ticker := time.NewTicker(500 * time.Millisecond)
154         defer ticker.Stop()
155
156         gotTxs := []*protocol.TxDesc{}
157         for {
158                 select {
159                 case <-ticker.C:
160                         gotTxs = b.mempool.GetTransactions()
161                         if len(gotTxs) >= txsNum {
162                                 goto out
163                         }
164                 case <-timeout.C:
165                         t.Fatalf("mempool sync timeout")
166                 }
167         }
168
169 out:
170         if len(gotTxs) != txsNum {
171                 t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
172         }
173
174         for i, gotTx := range gotTxs {
175                 index := gotTx.Tx.Inputs[0].Amount()
176                 if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
177                         t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
178                 }
179
180                 if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
181                         t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
182                 }
183         }
184 }