OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / netsync / chainmgr / tx_keeper.go
index 9f1a7cc..5071403 100644 (file)
@@ -21,8 +21,8 @@ type txSyncMsg struct {
        txs    []*types.Tx
 }
 
-func (cm *ChainManager) syncTransactions(peerID string) {
-       pending := cm.txPool.GetTransactions()
+func (m *Manager) syncTransactions(peerID string) {
+       pending := m.txPool.GetTransactions()
        if len(pending) == 0 {
                return
        }
@@ -31,13 +31,13 @@ func (cm *ChainManager) syncTransactions(peerID string) {
        for i, batch := range pending {
                txs[i] = batch.Tx
        }
-       cm.txSyncCh <- &txSyncMsg{peerID, txs}
+       m.txSyncCh <- &txSyncMsg{peerID, txs}
 }
 
-func (cm *ChainManager) txBroadcastLoop() {
+func (m *Manager) txBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-cm.txMsgSub.Chan():
+               case obj, ok := <-m.txMsgSub.Chan():
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
                                return
@@ -50,12 +50,12 @@ func (cm *ChainManager) txBroadcastLoop() {
                        }
 
                        if ev.TxMsg.MsgType == core.MsgNewTx {
-                               if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+                               if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
                                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
                                        continue
                                }
                        }
-               case <-cm.quitSync:
+               case <-m.quitSync:
                        return
                }
        }
@@ -65,14 +65,14 @@ func (cm *ChainManager) txBroadcastLoop() {
 // connection. When a new peer appears, we relay all currently pending
 // transactions. In order to minimise egress bandwidth usage, we send
 // the transactions in small packs to one peer at a time.
-func (cm *ChainManager) txSyncLoop() {
+func (m *Manager) txSyncLoop() {
        pending := make(map[string]*txSyncMsg)
        sending := false            // whether a send is active
        done := make(chan error, 1) // result of the send
 
        // send starts a sending a pack of transactions from the sync.
        send := func(msg *txSyncMsg) {
-               peer := cm.peers.GetPeer(msg.peerID)
+               peer := m.peers.GetPeer(msg.peerID)
                if peer == nil {
                        delete(pending, msg.peerID)
                        return
@@ -102,7 +102,7 @@ func (cm *ChainManager) txSyncLoop() {
                go func() {
                        err := peer.SendTransactions(sendTxs)
                        if err != nil {
-                               cm.peers.RemovePeer(msg.peerID)
+                               m.peers.RemovePeer(msg.peerID)
                        }
                        done <- err
                }()
@@ -125,7 +125,7 @@ func (cm *ChainManager) txSyncLoop() {
 
        for {
                select {
-               case msg := <-cm.txSyncCh:
+               case msg := <-m.txSyncCh:
                        pending[msg.peerID] = msg
                        if !sending {
                                send(msg)