X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fchainmgr%2Ftx_keeper.go;h=6c6f5f9e3861519c084d8f411198a629a36635e5;hp=9f1a7cc3591989d76ae70c5793719bcdf3d36468;hb=8643695a1ff4d574d231ef7ada1cde3bbc76404b;hpb=7e01ede3ce5d3688fa29f30bc766593beb9508e4 diff --git a/netsync/chainmgr/tx_keeper.go b/netsync/chainmgr/tx_keeper.go index 9f1a7cc3..6c6f5f9e 100644 --- a/netsync/chainmgr/tx_keeper.go +++ b/netsync/chainmgr/tx_keeper.go @@ -21,8 +21,8 @@ type txSyncMsg struct { txs []*types.Tx } -func (cm *ChainManager) syncTransactions(peerID string) { - pending := cm.txPool.GetTransactions() +func (m *Manager) syncMempool(peerID string) { + pending := m.mempool.GetTransactions() if len(pending) == 0 { return } @@ -31,13 +31,13 @@ func (cm *ChainManager) syncTransactions(peerID string) { for i, batch := range pending { txs[i] = batch.Tx } - cm.txSyncCh <- &txSyncMsg{peerID, txs} + m.txSyncCh <- &txSyncMsg{peerID, txs} } -func (cm *ChainManager) txBroadcastLoop() { +func (m *Manager) broadcastTxsLoop() { for { select { - case obj, ok := <-cm.txMsgSub.Chan(): + case obj, ok := <-m.txMsgSub.Chan(): if !ok { log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed") return @@ -50,29 +50,29 @@ func (cm *ChainManager) txBroadcastLoop() { } if ev.TxMsg.MsgType == core.MsgNewTx { - if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil { + if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.") continue } } - case <-cm.quitSync: + case <-m.quit: return } } } -// txSyncLoop takes care of the initial transaction sync for each new +// syncMempoolLoop takes care of the initial transaction sync for each new // connection. When a new peer appears, we relay all currently pending // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. -func (cm *ChainManager) txSyncLoop() { +func (m *Manager) syncMempoolLoop() { pending := make(map[string]*txSyncMsg) sending := false // whether a send is active done := make(chan error, 1) // result of the send // send starts a sending a pack of transactions from the sync. send := func(msg *txSyncMsg) { - peer := cm.peers.GetPeer(msg.peerID) + peer := m.peers.GetPeer(msg.peerID) if peer == nil { delete(pending, msg.peerID) return @@ -102,7 +102,7 @@ func (cm *ChainManager) txSyncLoop() { go func() { err := peer.SendTransactions(sendTxs) if err != nil { - cm.peers.RemovePeer(msg.peerID) + m.peers.RemovePeer(msg.peerID) } done <- err }() @@ -125,12 +125,11 @@ func (cm *ChainManager) txSyncLoop() { for { select { - case msg := <-cm.txSyncCh: + case msg := <-m.txSyncCh: pending[msg.peerID] = msg if !sending { send(msg) } - case err := <-done: sending = false if err != nil { @@ -140,6 +139,8 @@ func (cm *ChainManager) txSyncLoop() { if s := pick(); s != nil { send(s) } + case <-m.quit: + return } } }