package chainmgr import ( "math/rand" "time" log "github.com/sirupsen/logrus" core "github.com/vapor/protocol" "github.com/vapor/protocol/bc/types" ) const ( // This is the target size for the packs of transactions sent by txSyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txSyncPackSize = 100 * 1024 ) type txSyncMsg struct { peerID string txs []*types.Tx } func (m *Manager) syncMempool(peerID string) { pending := m.mempool.GetTransactions() if len(pending) == 0 { return } txs := make([]*types.Tx, len(pending)) for i, batch := range pending { txs[i] = batch.Tx } m.txSyncCh <- &txSyncMsg{peerID, txs} } func (m *Manager) broadcastTxsLoop() { for { select { case obj, ok := <-m.txMsgSub.Chan(): if !ok { log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed") return } ev, ok := obj.Data.(core.TxMsgEvent) if !ok { log.WithFields(log.Fields{"module": logModule}).Error("event type error") continue } if ev.TxMsg.MsgType == core.MsgNewTx { if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.") continue } } case <-m.quit: return } } } // syncMempoolLoop takes care of the initial transaction sync for each new // connection. When a new peer appears, we relay all currently pending // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. func (m *Manager) syncMempoolLoop() { pending := make(map[string]*txSyncMsg) sending := false // whether a send is active done := make(chan error, 1) // result of the send // send starts a sending a pack of transactions from the sync. send := func(msg *txSyncMsg) { peer := m.peers.GetPeer(msg.peerID) if peer == nil { delete(pending, msg.peerID) return } totalSize := uint64(0) sendTxs := []*types.Tx{} for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ { sendTxs = append(sendTxs, msg.txs[i]) totalSize += msg.txs[i].SerializedSize } if len(msg.txs) == len(sendTxs) { delete(pending, msg.peerID) } else { msg.txs = msg.txs[len(sendTxs):] } // Send the pack in the background. log.WithFields(log.Fields{ "module": logModule, "count": len(sendTxs), "bytes": totalSize, "peer": msg.peerID, }).Debug("txSyncLoop sending transactions") sending = true go func() { err := peer.SendTransactions(sendTxs) if err != nil { m.peers.RemovePeer(msg.peerID) } done <- err }() } // pick chooses the next pending sync. pick := func() *txSyncMsg { if len(pending) == 0 { return nil } n := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(pending)) + 1 for _, s := range pending { if n--; n == 0 { return s } } return nil } for { select { case msg := <-m.txSyncCh: pending[msg.peerID] = msg if !sending { send(msg) } case err := <-done: sending = false if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending") } if s := pick(); s != nil { send(s) } case <-m.quit: return } } }