6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/protocol/bc/types"
12 // This is the target size for the packs of transactions sent by txSyncLoop.
13 // A pack can get larger than this if a single transactions exceeds this size.
14 txSyncPackSize = 100 * 1024
17 type txSyncMsg struct {
22 func (sm *SyncManager) syncTransactions(peerID string) {
23 pending := sm.txPool.GetTransactions()
24 if len(pending) == 0 {
28 txs := make([]*types.Tx, len(pending))
29 for i, batch := range pending {
32 sm.txSyncCh <- &txSyncMsg{peerID, txs}
35 func (sm *SyncManager) txBroadcastLoop() {
38 case newTx := <-sm.newTxCh:
39 if err := sm.peers.broadcastTx(newTx); err != nil {
40 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
49 // txSyncLoop takes care of the initial transaction sync for each new
50 // connection. When a new peer appears, we relay all currently pending
51 // transactions. In order to minimise egress bandwidth usage, we send
52 // the transactions in small packs to one peer at a time.
53 func (sm *SyncManager) txSyncLoop() {
54 pending := make(map[string]*txSyncMsg)
55 sending := false // whether a send is active
56 done := make(chan error, 1) // result of the send
58 // send starts a sending a pack of transactions from the sync.
59 send := func(msg *txSyncMsg) {
60 peer := sm.peers.getPeer(msg.peerID)
62 delete(pending, msg.peerID)
66 totalSize := uint64(0)
67 sendTxs := []*types.Tx{}
68 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
69 sendTxs = append(sendTxs, msg.txs[i])
70 totalSize += msg.txs[i].SerializedSize
73 if len(msg.txs) == len(sendTxs) {
74 delete(pending, msg.peerID)
76 msg.txs = msg.txs[len(sendTxs):]
79 // Send the pack in the background.
80 log.WithFields(log.Fields{
82 "count": len(sendTxs),
85 }).Debug("txSyncLoop sending transactions")
88 ok, err := peer.sendTransactions(sendTxs)
90 sm.peers.removePeer(msg.peerID)
96 // pick chooses the next pending sync.
97 pick := func() *txSyncMsg {
98 if len(pending) == 0 {
102 n := rand.Intn(len(pending)) + 1
103 for _, s := range pending {
113 case msg := <-sm.txSyncCh:
114 pending[msg.peerID] = msg
122 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
125 if s := pick(); s != nil {