log "github.com/sirupsen/logrus"
- core "github.com/vapor/protocol"
- "github.com/vapor/protocol/bc/types"
+ core "github.com/bytom/vapor/protocol"
+ "github.com/bytom/vapor/protocol/bc/types"
)
const (
txs []*types.Tx
}
-func (cm *ChainManager) syncTransactions(peerID string) {
- pending := cm.txPool.GetTransactions()
+func (m *Manager) syncMempool(peerID string) {
+ pending := m.mempool.GetTransactions()
if len(pending) == 0 {
return
}
for i, batch := range pending {
txs[i] = batch.Tx
}
- cm.txSyncCh <- &txSyncMsg{peerID, txs}
+ m.txSyncCh <- &txSyncMsg{peerID, txs}
}
-func (cm *ChainManager) txBroadcastLoop() {
+func (m *Manager) broadcastTxsLoop() {
for {
select {
- case obj, ok := <-cm.txMsgSub.Chan():
+ case obj, ok := <-m.txMsgSub.Chan():
if !ok {
log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
return
}
if ev.TxMsg.MsgType == core.MsgNewTx {
- if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+ if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
continue
}
}
- case <-cm.quitSync:
+ case <-m.quit:
return
}
}
}
-// txSyncLoop takes care of the initial transaction sync for each new
+// syncMempoolLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
-func (cm *ChainManager) txSyncLoop() {
+func (m *Manager) syncMempoolLoop() {
pending := make(map[string]*txSyncMsg)
sending := false // whether a send is active
done := make(chan error, 1) // result of the send
// send starts a sending a pack of transactions from the sync.
send := func(msg *txSyncMsg) {
- peer := cm.peers.GetPeer(msg.peerID)
+ peer := m.peers.GetPeer(msg.peerID)
if peer == nil {
delete(pending, msg.peerID)
return
go func() {
err := peer.SendTransactions(sendTxs)
if err != nil {
- cm.peers.RemovePeer(msg.peerID)
+ m.peers.RemovePeer(msg.peerID)
}
done <- err
}()
for {
select {
- case msg := <-cm.txSyncCh:
+ case msg := <-m.txSyncCh:
pending[msg.peerID] = msg
if !sending {
send(msg)
}
-
case err := <-done:
sending = false
if err != nil {
if s := pick(); s != nil {
send(s)
}
+ case <-m.quit:
+ return
}
}
}