7 log "github.com/sirupsen/logrus"
9 core "github.com/vapor/protocol"
10 "github.com/vapor/protocol/bc/types"
14 // This is the target size for the packs of transactions sent by txSyncLoop.
15 // A pack can get larger than this if a single transactions exceeds this size.
16 txSyncPackSize = 100 * 1024
19 type txSyncMsg struct {
24 func (m *Manager) syncTransactions(peerID string) {
25 pending := m.txPool.GetTransactions()
26 if len(pending) == 0 {
30 txs := make([]*types.Tx, len(pending))
31 for i, batch := range pending {
34 m.txSyncCh <- &txSyncMsg{peerID, txs}
37 func (m *Manager) txBroadcastLoop() {
40 case obj, ok := <-m.txMsgSub.Chan():
42 log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
46 ev, ok := obj.Data.(core.TxMsgEvent)
48 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
52 if ev.TxMsg.MsgType == core.MsgNewTx {
53 if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
54 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
64 // txSyncLoop takes care of the initial transaction sync for each new
65 // connection. When a new peer appears, we relay all currently pending
66 // transactions. In order to minimise egress bandwidth usage, we send
67 // the transactions in small packs to one peer at a time.
68 func (m *Manager) txSyncLoop() {
69 pending := make(map[string]*txSyncMsg)
70 sending := false // whether a send is active
71 done := make(chan error, 1) // result of the send
73 // send starts a sending a pack of transactions from the sync.
74 send := func(msg *txSyncMsg) {
75 peer := m.peers.GetPeer(msg.peerID)
77 delete(pending, msg.peerID)
81 totalSize := uint64(0)
82 sendTxs := []*types.Tx{}
83 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
84 sendTxs = append(sendTxs, msg.txs[i])
85 totalSize += msg.txs[i].SerializedSize
88 if len(msg.txs) == len(sendTxs) {
89 delete(pending, msg.peerID)
91 msg.txs = msg.txs[len(sendTxs):]
94 // Send the pack in the background.
95 log.WithFields(log.Fields{
97 "count": len(sendTxs),
100 }).Debug("txSyncLoop sending transactions")
103 err := peer.SendTransactions(sendTxs)
105 m.peers.RemovePeer(msg.peerID)
111 // pick chooses the next pending sync.
112 pick := func() *txSyncMsg {
113 if len(pending) == 0 {
117 n := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(pending)) + 1
118 for _, s := range pending {
128 case msg := <-m.txSyncCh:
129 pending[msg.peerID] = msg
137 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
140 if s := pick(); s != nil {