}
for i, c := range cases {
- mockChain := mock.NewChain()
+ mockChain := mock.NewChain(nil)
bk := &blockKeeper{chain: mockChain}
mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader)
for i := uint64(0); i <= c.bestHeight; i++ {
for i, c := range cases {
syncTimeout = c.syncTimeout
- a := mockSync(c.aBlocks)
- b := mockSync(c.bBlocks)
+ a := mockSync(c.aBlocks, nil)
+ b := mockSync(c.bBlocks, nil)
netWork := NewNetWork()
netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
},
}
- mockChain := mock.NewChain()
+ mockChain := mock.NewChain(nil)
bk := &blockKeeper{chain: mockChain}
for _, block := range blocks {
mockChain.SetBlockByHeight(block.Height, block)
}
for i, c := range cases {
- mockChain := mock.NewChain()
+ mockChain := mock.NewChain(nil)
bk := &blockKeeper{chain: mockChain}
for i := uint64(0); i <= c.chainHeight; i++ {
mockChain.SetBlockByHeight(i, blocks[i])
},
}
- mockChain := mock.NewChain()
+ mockChain := mock.NewChain(nil)
for i, c := range cases {
consensus.ActiveNetParams.Checkpoints = c.checkPoints
mockChain.SetBestBlockHeader(&types.BlockHeader{Height: c.bestHeight})
for i, c := range cases {
syncTimeout = c.syncTimeout
- a := mockSync(c.aBlocks)
- b := mockSync(c.bBlocks)
+ a := mockSync(c.aBlocks, nil)
+ b := mockSync(c.bBlocks, nil)
netWork := NewNetWork()
netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
func TestRequireBlock(t *testing.T) {
blocks := mockBlocks(nil, 5)
- a := mockSync(blocks[:1])
- b := mockSync(blocks[:5])
+ a := mockSync(blocks[:1], nil)
+ b := mockSync(blocks[:5], nil)
netWork := NewNetWork()
netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
t.Fatal(err)
}
- spvNode := mockSync(blocks)
+ spvNode := mockSync(blocks, nil)
blockHash := targetBlock.Hash()
var statusResult *bc.TransactionStatus
if statusResult, err = spvNode.chain.GetTransactionStatus(&blockHash); err != nil {
t.Fatal(err)
}
- fullNode := mockSync(blocks)
+ fullNode := mockSync(blocks, nil)
netWork := NewNetWork()
netWork.Register(spvNode, "192.168.0.1", "spv_node", consensus.SFFastSync)
netWork.Register(fullNode, "192.168.0.2", "full_node", consensus.DefaultServices)
Peers() *p2p.PeerSet
}
+// Mempool is the interface for Bytom mempool
+type Mempool interface {
+ GetTransactions() []*core.TxDesc
+}
+
//Manager is responsible for the business layer information synchronization
type Manager struct {
sw Switch
chain Chain
- txPool *core.TxPool
+ mempool Mempool
blockKeeper *blockKeeper
peers *peers.PeerSet
txSyncCh chan *txSyncMsg
- quitSync chan struct{}
+ quit chan struct{}
config *cfg.Config
eventDispatcher *event.Dispatcher
}
//NewChainManager create a chain sync manager.
-func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
manager := &Manager{
sw: sw,
- txPool: txPool,
+ mempool: mempool,
chain: chain,
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
txSyncCh: make(chan *txSyncMsg),
- quitSync: make(chan struct{}),
+ quit: make(chan struct{}),
config: config,
eventDispatcher: dispatcher,
}
return err
}
- // broadcast transactions
- go m.txBroadcastLoop()
- go m.txSyncLoop()
+ go m.broadcastTxsLoop()
+ go m.syncMempoolLoop()
return nil
}
//Stop stop sync manager
func (m *Manager) Stop() {
- close(m.quitSync)
+ close(m.quit)
}
if err := pr.manager.SendStatus(peer); err != nil {
return err
}
- pr.manager.syncTransactions(peer.Key)
+ pr.manager.syncMempool(peer.Key)
return nil
}
return blocks
}
-func mockSync(blocks []*types.Block) *Manager {
- chain := mock.NewChain()
+func mockSync(blocks []*types.Block, mempool *mock.Mempool) *Manager {
+ chain := mock.NewChain(mempool)
peers := peers.NewPeerSet(NewPeerSet())
chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
for _, block := range blocks {
chain: chain,
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
+ mempool: mempool,
+ txSyncCh: make(chan *txSyncMsg),
}
}
txs []*types.Tx
}
-func (m *Manager) syncTransactions(peerID string) {
- pending := m.txPool.GetTransactions()
+func (m *Manager) syncMempool(peerID string) {
+ pending := m.mempool.GetTransactions()
if len(pending) == 0 {
return
}
m.txSyncCh <- &txSyncMsg{peerID, txs}
}
-func (m *Manager) txBroadcastLoop() {
+func (m *Manager) broadcastTxsLoop() {
for {
select {
case obj, ok := <-m.txMsgSub.Chan():
continue
}
}
- case <-m.quitSync:
+ case <-m.quit:
return
}
}
}
-// txSyncLoop takes care of the initial transaction sync for each new
+// syncMempoolLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
-func (m *Manager) txSyncLoop() {
+func (m *Manager) syncMempoolLoop() {
pending := make(map[string]*txSyncMsg)
sending := false // whether a send is active
done := make(chan error, 1) // result of the send
if !sending {
send(msg)
}
-
case err := <-done:
sending = false
if err != nil {
if s := pick(); s != nil {
send(s)
}
+ case <-m.quit:
+ return
}
}
}
--- /dev/null
+package chainmgr
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+
+ "github.com/vapor/consensus"
+ "github.com/vapor/protocol"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+ "github.com/vapor/test/mock"
+)
+
+const txsNumber = 2000
+
+func getTransactions() []*types.Tx {
+ txs := []*types.Tx{}
+ for i := 0; i < txsNumber; i++ {
+ txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51})
+ txInput.CommitmentSuffix = []byte{0, 1, 2}
+ txInput.WitnessSuffix = []byte{0, 1, 2}
+
+ tx := &types.Tx{
+
+ TxData: types.TxData{
+ //SerializedSize: uint64(i * 10),
+ Inputs: []*types.TxInput{
+ txInput,
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}),
+ },
+ },
+ Tx: &bc.Tx{
+ ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)},
+ },
+ }
+ txs = append(txs, tx)
+ }
+ return txs
+}
+
+func TestSyncMempool(t *testing.T) {
+ blocks := mockBlocks(nil, 5)
+ a := mockSync(blocks, &mock.Mempool{})
+ b := mockSync(blocks, &mock.Mempool{})
+
+ netWork := NewNetWork()
+ netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
+ netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
+ if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
+ t.Errorf("fail on peer hands shake %v", err)
+ } else {
+ go B2A.postMan()
+ go A2B.postMan()
+ }
+
+ go a.syncMempoolLoop()
+ a.syncMempool("test node B")
+ wantTxs := getTransactions()
+ a.txSyncCh <- &txSyncMsg{"test node B", wantTxs}
+
+ timeout := time.NewTimer(2 * time.Second)
+ defer timeout.Stop()
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
+
+ gotTxs := []*protocol.TxDesc{}
+ for {
+ select {
+ case <-ticker.C:
+ gotTxs = b.mempool.GetTransactions()
+ if len(gotTxs) >= txsNumber {
+ goto out
+ }
+ case <-timeout.C:
+ t.Fatalf("mempool sync timeout")
+ }
+ }
+
+out:
+ if len(gotTxs) != txsNumber {
+ t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
+ }
+
+ for i, gotTx := range gotTxs {
+ index := gotTx.Tx.Inputs[0].Amount()
+ if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Inputs[0].Amount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Inputs))
+ }
+
+ if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Outputs[0].AssetAmount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Outputs))
+ }
+
+ }
+
+}
"github.com/vapor/protocol/bc/types"
)
+type mempool interface {
+ AddTx(tx *types.Tx)
+}
+
type Chain struct {
bestBlockHeader *types.BlockHeader
heightMap map[uint64]*types.Block
blockMap map[bc.Hash]*types.Block
-
- prevOrphans map[bc.Hash]*types.Block
+ prevOrphans map[bc.Hash]*types.Block
+ mempool mempool
}
-func NewChain() *Chain {
+func NewChain(mempool *Mempool) *Chain {
return &Chain{
heightMap: map[uint64]*types.Block{},
blockMap: map[bc.Hash]*types.Block{},
prevOrphans: make(map[bc.Hash]*types.Block),
+ mempool: mempool,
}
}
c.blockMap[block.Hash()] = block
}
-func (c *Chain) ValidateTx(*types.Tx) (bool, error) {
+func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
+ c.mempool.AddTx(tx)
return false, nil
}
--- /dev/null
+package mock
+
+import (
+ "github.com/vapor/protocol"
+ "github.com/vapor/protocol/bc/types"
+)
+
+type Mempool struct {
+ txs []*protocol.TxDesc
+}
+
+func newMempool() *Mempool {
+ return &Mempool{
+ txs: []*protocol.TxDesc{},
+ }
+}
+
+func (m *Mempool) AddTx(tx *types.Tx) {
+ m.txs = append(m.txs, &protocol.TxDesc{Tx: tx})
+}
+
+func (m *Mempool) GetTransactions() []*protocol.TxDesc {
+ return m.txs
+}