6 log "github.com/sirupsen/logrus"
8 core "github.com/bytom/protocol"
9 "github.com/bytom/protocol/bc/types"
13 // This is the target size for the packs of transactions sent by txSyncLoop.
14 // A pack can get larger than this if a single transactions exceeds this size.
15 txSyncPackSize = 100 * 1024
18 type txSyncMsg struct {
23 func (sm *SyncManager) syncTransactions(peerID string) {
24 pending := sm.txPool.GetTransactions()
25 if len(pending) == 0 {
29 txs := make([]*types.Tx, len(pending))
30 for i, batch := range pending {
33 sm.txSyncCh <- &txSyncMsg{peerID, txs}
36 func (sm *SyncManager) txBroadcastLoop() {
39 case obj, ok := <-sm.txMsgSub.Chan():
41 log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
45 ev, ok := obj.Data.(core.TxMsgEvent)
47 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
51 if ev.TxMsg.MsgType == core.MsgNewTx {
52 if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
53 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
63 // txSyncLoop takes care of the initial transaction sync for each new
64 // connection. When a new peer appears, we relay all currently pending
65 // transactions. In order to minimise egress bandwidth usage, we send
66 // the transactions in small packs to one peer at a time.
67 func (sm *SyncManager) txSyncLoop() {
68 pending := make(map[string]*txSyncMsg)
69 sending := false // whether a send is active
70 done := make(chan error, 1) // result of the send
72 // send starts a sending a pack of transactions from the sync.
73 send := func(msg *txSyncMsg) {
74 peer := sm.peers.getPeer(msg.peerID)
76 delete(pending, msg.peerID)
80 totalSize := uint64(0)
81 sendTxs := []*types.Tx{}
82 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
83 sendTxs = append(sendTxs, msg.txs[i])
84 totalSize += msg.txs[i].SerializedSize
87 if len(msg.txs) == len(sendTxs) {
88 delete(pending, msg.peerID)
90 msg.txs = msg.txs[len(sendTxs):]
93 // Send the pack in the background.
94 log.WithFields(log.Fields{
96 "count": len(sendTxs),
99 }).Debug("txSyncLoop sending transactions")
102 ok, err := peer.sendTransactions(sendTxs)
104 sm.peers.removePeer(msg.peerID)
110 // pick chooses the next pending sync.
111 pick := func() *txSyncMsg {
112 if len(pending) == 0 {
116 n := rand.Intn(len(pending)) + 1
117 for _, s := range pending {
127 case msg := <-sm.txSyncCh:
128 pending[msg.peerID] = msg
136 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
139 if s := pick(); s != nil {