OSDN Git Service

Fix fast sync bug when the chain has fork (#282)
[bytom/vapor.git] / netsync / chainmgr / tx_keeper.go
index 9f1a7cc..6c6f5f9 100644 (file)
@@ -21,8 +21,8 @@ type txSyncMsg struct {
        txs    []*types.Tx
 }
 
-func (cm *ChainManager) syncTransactions(peerID string) {
-       pending := cm.txPool.GetTransactions()
+func (m *Manager) syncMempool(peerID string) {
+       pending := m.mempool.GetTransactions()
        if len(pending) == 0 {
                return
        }
@@ -31,13 +31,13 @@ func (cm *ChainManager) syncTransactions(peerID string) {
        for i, batch := range pending {
                txs[i] = batch.Tx
        }
-       cm.txSyncCh <- &txSyncMsg{peerID, txs}
+       m.txSyncCh <- &txSyncMsg{peerID, txs}
 }
 
-func (cm *ChainManager) txBroadcastLoop() {
+func (m *Manager) broadcastTxsLoop() {
        for {
                select {
-               case obj, ok := <-cm.txMsgSub.Chan():
+               case obj, ok := <-m.txMsgSub.Chan():
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
                                return
@@ -50,29 +50,29 @@ func (cm *ChainManager) txBroadcastLoop() {
                        }
 
                        if ev.TxMsg.MsgType == core.MsgNewTx {
-                               if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+                               if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
                                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
                                        continue
                                }
                        }
-               case <-cm.quitSync:
+               case <-m.quit:
                        return
                }
        }
 }
 
-// txSyncLoop takes care of the initial transaction sync for each new
+// syncMempoolLoop takes care of the initial transaction sync for each new
 // connection. When a new peer appears, we relay all currently pending
 // transactions. In order to minimise egress bandwidth usage, we send
 // the transactions in small packs to one peer at a time.
-func (cm *ChainManager) txSyncLoop() {
+func (m *Manager) syncMempoolLoop() {
        pending := make(map[string]*txSyncMsg)
        sending := false            // whether a send is active
        done := make(chan error, 1) // result of the send
 
        // send starts a sending a pack of transactions from the sync.
        send := func(msg *txSyncMsg) {
-               peer := cm.peers.GetPeer(msg.peerID)
+               peer := m.peers.GetPeer(msg.peerID)
                if peer == nil {
                        delete(pending, msg.peerID)
                        return
@@ -102,7 +102,7 @@ func (cm *ChainManager) txSyncLoop() {
                go func() {
                        err := peer.SendTransactions(sendTxs)
                        if err != nil {
-                               cm.peers.RemovePeer(msg.peerID)
+                               m.peers.RemovePeer(msg.peerID)
                        }
                        done <- err
                }()
@@ -125,12 +125,11 @@ func (cm *ChainManager) txSyncLoop() {
 
        for {
                select {
-               case msg := <-cm.txSyncCh:
+               case msg := <-m.txSyncCh:
                        pending[msg.peerID] = msg
                        if !sending {
                                send(msg)
                        }
-
                case err := <-done:
                        sending = false
                        if err != nil {
@@ -140,6 +139,8 @@ func (cm *ChainManager) txSyncLoop() {
                        if s := pick(); s != nil {
                                send(s)
                        }
+               case <-m.quit:
+                       return
                }
        }
 }