OSDN Git Service

Add mempool sync test (#114)
authoryahtoo <yahtoo.ma@gmail.com>
Mon, 3 Jun 2019 02:45:54 +0000 (10:45 +0800)
committerPaladz <yzhu101@uottawa.ca>
Mon, 3 Jun 2019 02:45:54 +0000 (10:45 +0800)
netsync/chainmgr/block_keeper_test.go
netsync/chainmgr/handle.go
netsync/chainmgr/protocol_reactor.go
netsync/chainmgr/tool_test.go
netsync/chainmgr/tx_keeper.go
netsync/chainmgr/tx_keeper_test.go [new file with mode: 0644]
test/mock/chain.go
test/mock/mempool.go [new file with mode: 0644]

index 2b1cc69..43e6ec7 100644 (file)
@@ -109,7 +109,7 @@ func TestBlockLocator(t *testing.T) {
        }
 
        for i, c := range cases {
-               mockChain := mock.NewChain()
+               mockChain := mock.NewChain(nil)
                bk := &blockKeeper{chain: mockChain}
                mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader)
                for i := uint64(0); i <= c.bestHeight; i++ {
@@ -178,8 +178,8 @@ func TestFastBlockSync(t *testing.T) {
 
        for i, c := range cases {
                syncTimeout = c.syncTimeout
-               a := mockSync(c.aBlocks)
-               b := mockSync(c.bBlocks)
+               a := mockSync(c.aBlocks, nil)
+               b := mockSync(c.bBlocks, nil)
                netWork := NewNetWork()
                netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
                netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
@@ -225,7 +225,7 @@ func TestLocateBlocks(t *testing.T) {
                },
        }
 
-       mockChain := mock.NewChain()
+       mockChain := mock.NewChain(nil)
        bk := &blockKeeper{chain: mockChain}
        for _, block := range blocks {
                mockChain.SetBlockByHeight(block.Height, block)
@@ -305,7 +305,7 @@ func TestLocateHeaders(t *testing.T) {
        }
 
        for i, c := range cases {
-               mockChain := mock.NewChain()
+               mockChain := mock.NewChain(nil)
                bk := &blockKeeper{chain: mockChain}
                for i := uint64(0); i <= c.chainHeight; i++ {
                        mockChain.SetBlockByHeight(i, blocks[i])
@@ -379,7 +379,7 @@ func TestNextCheckpoint(t *testing.T) {
                },
        }
 
-       mockChain := mock.NewChain()
+       mockChain := mock.NewChain(nil)
        for i, c := range cases {
                consensus.ActiveNetParams.Checkpoints = c.checkPoints
                mockChain.SetBestBlockHeader(&types.BlockHeader{Height: c.bestHeight})
@@ -439,8 +439,8 @@ func TestRegularBlockSync(t *testing.T) {
 
        for i, c := range cases {
                syncTimeout = c.syncTimeout
-               a := mockSync(c.aBlocks)
-               b := mockSync(c.bBlocks)
+               a := mockSync(c.aBlocks, nil)
+               b := mockSync(c.bBlocks, nil)
                netWork := NewNetWork()
                netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
                netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
@@ -473,8 +473,8 @@ func TestRegularBlockSync(t *testing.T) {
 
 func TestRequireBlock(t *testing.T) {
        blocks := mockBlocks(nil, 5)
-       a := mockSync(blocks[:1])
-       b := mockSync(blocks[:5])
+       a := mockSync(blocks[:1], nil)
+       b := mockSync(blocks[:5], nil)
        netWork := NewNetWork()
        netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
        netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
@@ -560,7 +560,7 @@ func TestSendMerkleBlock(t *testing.T) {
                        t.Fatal(err)
                }
 
-               spvNode := mockSync(blocks)
+               spvNode := mockSync(blocks, nil)
                blockHash := targetBlock.Hash()
                var statusResult *bc.TransactionStatus
                if statusResult, err = spvNode.chain.GetTransactionStatus(&blockHash); err != nil {
@@ -571,7 +571,7 @@ func TestSendMerkleBlock(t *testing.T) {
                        t.Fatal(err)
                }
 
-               fullNode := mockSync(blocks)
+               fullNode := mockSync(blocks, nil)
                netWork := NewNetWork()
                netWork.Register(spvNode, "192.168.0.1", "spv_node", consensus.SFFastSync)
                netWork.Register(fullNode, "192.168.0.2", "full_node", consensus.DefaultServices)
index 9dea086..51dee4c 100644 (file)
@@ -45,16 +45,21 @@ type Switch interface {
        Peers() *p2p.PeerSet
 }
 
+// Mempool is the interface for Bytom mempool
+type Mempool interface {
+       GetTransactions() []*core.TxDesc
+}
+
 //Manager is responsible for the business layer information synchronization
 type Manager struct {
        sw          Switch
        chain       Chain
-       txPool      *core.TxPool
+       mempool     Mempool
        blockKeeper *blockKeeper
        peers       *peers.PeerSet
 
        txSyncCh chan *txSyncMsg
-       quitSync chan struct{}
+       quit     chan struct{}
        config   *cfg.Config
 
        eventDispatcher *event.Dispatcher
@@ -62,15 +67,15 @@ type Manager struct {
 }
 
 //NewChainManager create a chain sync manager.
-func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
        manager := &Manager{
                sw:              sw,
-               txPool:          txPool,
+               mempool:         mempool,
                chain:           chain,
                blockKeeper:     newBlockKeeper(chain, peers),
                peers:           peers,
                txSyncCh:        make(chan *txSyncMsg),
-               quitSync:        make(chan struct{}),
+               quit:            make(chan struct{}),
                config:          config,
                eventDispatcher: dispatcher,
        }
@@ -359,14 +364,13 @@ func (m *Manager) Start() error {
                return err
        }
 
-       // broadcast transactions
-       go m.txBroadcastLoop()
-       go m.txSyncLoop()
+       go m.broadcastTxsLoop()
+       go m.syncMempoolLoop()
 
        return nil
 }
 
 //Stop stop sync manager
 func (m *Manager) Stop() {
-       close(m.quitSync)
+       close(m.quit)
 }
index 85a5c25..7cf0909 100644 (file)
@@ -56,7 +56,7 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
        if err := pr.manager.SendStatus(peer); err != nil {
                return err
        }
-       pr.manager.syncTransactions(peer.Key)
+       pr.manager.syncMempool(peer.Key)
        return nil
 }
 
index dba3c89..e354984 100644 (file)
@@ -150,8 +150,8 @@ func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
        return blocks
 }
 
-func mockSync(blocks []*types.Block) *Manager {
-       chain := mock.NewChain()
+func mockSync(blocks []*types.Block, mempool *mock.Mempool) *Manager {
+       chain := mock.NewChain(mempool)
        peers := peers.NewPeerSet(NewPeerSet())
        chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
        for _, block := range blocks {
@@ -162,6 +162,8 @@ func mockSync(blocks []*types.Block) *Manager {
                chain:       chain,
                blockKeeper: newBlockKeeper(chain, peers),
                peers:       peers,
+               mempool:     mempool,
+               txSyncCh:    make(chan *txSyncMsg),
        }
 }
 
index 5071403..6c6f5f9 100644 (file)
@@ -21,8 +21,8 @@ type txSyncMsg struct {
        txs    []*types.Tx
 }
 
-func (m *Manager) syncTransactions(peerID string) {
-       pending := m.txPool.GetTransactions()
+func (m *Manager) syncMempool(peerID string) {
+       pending := m.mempool.GetTransactions()
        if len(pending) == 0 {
                return
        }
@@ -34,7 +34,7 @@ func (m *Manager) syncTransactions(peerID string) {
        m.txSyncCh <- &txSyncMsg{peerID, txs}
 }
 
-func (m *Manager) txBroadcastLoop() {
+func (m *Manager) broadcastTxsLoop() {
        for {
                select {
                case obj, ok := <-m.txMsgSub.Chan():
@@ -55,17 +55,17 @@ func (m *Manager) txBroadcastLoop() {
                                        continue
                                }
                        }
-               case <-m.quitSync:
+               case <-m.quit:
                        return
                }
        }
 }
 
-// txSyncLoop takes care of the initial transaction sync for each new
+// syncMempoolLoop takes care of the initial transaction sync for each new
 // connection. When a new peer appears, we relay all currently pending
 // transactions. In order to minimise egress bandwidth usage, we send
 // the transactions in small packs to one peer at a time.
-func (m *Manager) txSyncLoop() {
+func (m *Manager) syncMempoolLoop() {
        pending := make(map[string]*txSyncMsg)
        sending := false            // whether a send is active
        done := make(chan error, 1) // result of the send
@@ -130,7 +130,6 @@ func (m *Manager) txSyncLoop() {
                        if !sending {
                                send(msg)
                        }
-
                case err := <-done:
                        sending = false
                        if err != nil {
@@ -140,6 +139,8 @@ func (m *Manager) txSyncLoop() {
                        if s := pick(); s != nil {
                                send(s)
                        }
+               case <-m.quit:
+                       return
                }
        }
 }
diff --git a/netsync/chainmgr/tx_keeper_test.go b/netsync/chainmgr/tx_keeper_test.go
new file mode 100644 (file)
index 0000000..7401af2
--- /dev/null
@@ -0,0 +1,101 @@
+package chainmgr
+
+import (
+       "reflect"
+       "testing"
+       "time"
+
+       "github.com/davecgh/go-spew/spew"
+
+       "github.com/vapor/consensus"
+       "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+       "github.com/vapor/test/mock"
+)
+
+const txsNumber = 2000
+
+func getTransactions() []*types.Tx {
+       txs := []*types.Tx{}
+       for i := 0; i < txsNumber; i++ {
+               txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51})
+               txInput.CommitmentSuffix = []byte{0, 1, 2}
+               txInput.WitnessSuffix = []byte{0, 1, 2}
+
+               tx := &types.Tx{
+
+                       TxData: types.TxData{
+                               //SerializedSize: uint64(i * 10),
+                               Inputs: []*types.TxInput{
+                                       txInput,
+                               },
+                               Outputs: []*types.TxOutput{
+                                       types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}),
+                               },
+                       },
+                       Tx: &bc.Tx{
+                               ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)},
+                       },
+               }
+               txs = append(txs, tx)
+       }
+       return txs
+}
+
+func TestSyncMempool(t *testing.T) {
+       blocks := mockBlocks(nil, 5)
+       a := mockSync(blocks, &mock.Mempool{})
+       b := mockSync(blocks, &mock.Mempool{})
+
+       netWork := NewNetWork()
+       netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
+       netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
+       if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
+               t.Errorf("fail on peer hands shake %v", err)
+       } else {
+               go B2A.postMan()
+               go A2B.postMan()
+       }
+
+       go a.syncMempoolLoop()
+       a.syncMempool("test node B")
+       wantTxs := getTransactions()
+       a.txSyncCh <- &txSyncMsg{"test node B", wantTxs}
+
+       timeout := time.NewTimer(2 * time.Second)
+       defer timeout.Stop()
+       ticker := time.NewTicker(500 * time.Millisecond)
+       defer ticker.Stop()
+
+       gotTxs := []*protocol.TxDesc{}
+       for {
+               select {
+               case <-ticker.C:
+                       gotTxs = b.mempool.GetTransactions()
+                       if len(gotTxs) >= txsNumber {
+                               goto out
+                       }
+               case <-timeout.C:
+                       t.Fatalf("mempool sync timeout")
+               }
+       }
+
+out:
+       if len(gotTxs) != txsNumber {
+               t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
+       }
+
+       for i, gotTx := range gotTxs {
+               index := gotTx.Tx.Inputs[0].Amount()
+               if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Inputs[0].Amount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Inputs))
+               }
+
+               if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Outputs[0].AssetAmount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Outputs))
+               }
+
+       }
+
+}
index b1601b1..0b93ce9 100644 (file)
@@ -8,19 +8,24 @@ import (
        "github.com/vapor/protocol/bc/types"
 )
 
+type mempool interface {
+       AddTx(tx *types.Tx)
+}
+
 type Chain struct {
        bestBlockHeader *types.BlockHeader
        heightMap       map[uint64]*types.Block
        blockMap        map[bc.Hash]*types.Block
-
-       prevOrphans map[bc.Hash]*types.Block
+       prevOrphans     map[bc.Hash]*types.Block
+       mempool         mempool
 }
 
-func NewChain() *Chain {
+func NewChain(mempool *Mempool) *Chain {
        return &Chain{
                heightMap:   map[uint64]*types.Block{},
                blockMap:    map[bc.Hash]*types.Block{},
                prevOrphans: make(map[bc.Hash]*types.Block),
+               mempool:     mempool,
        }
 }
 
@@ -137,6 +142,7 @@ func (c *Chain) SetBlockByHeight(height uint64, block *types.Block) {
        c.blockMap[block.Hash()] = block
 }
 
-func (c *Chain) ValidateTx(*types.Tx) (bool, error) {
+func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
+       c.mempool.AddTx(tx)
        return false, nil
 }
diff --git a/test/mock/mempool.go b/test/mock/mempool.go
new file mode 100644 (file)
index 0000000..767fb7b
--- /dev/null
@@ -0,0 +1,24 @@
+package mock
+
+import (
+       "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc/types"
+)
+
+type Mempool struct {
+       txs []*protocol.TxDesc
+}
+
+func newMempool() *Mempool {
+       return &Mempool{
+               txs: []*protocol.TxDesc{},
+       }
+}
+
+func (m *Mempool) AddTx(tx *types.Tx) {
+       m.txs = append(m.txs, &protocol.TxDesc{Tx: tx})
+}
+
+func (m *Mempool) GetTransactions() []*protocol.TxDesc {
+       return m.txs
+}