6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/protocol/bc/types"
12 // This is the target size for the packs of transactions sent by txSyncLoop.
13 // A pack can get larger than this if a single transactions exceeds this size.
14 txSyncPackSize = 100 * 1024
17 type txSyncMsg struct {
22 func (sm *SyncManager) syncTransactions(peerID string) {
23 pending := sm.txPool.GetTransactions()
24 if len(pending) == 0 {
28 txs := make([]*types.Tx, len(pending))
29 for i, batch := range pending {
32 sm.txSyncCh <- &txSyncMsg{peerID, txs}
35 func (sm *SyncManager) txBroadcastLoop() {
38 case newTx := <-sm.newTxCh:
39 if err := sm.peers.broadcastTx(newTx); err != nil {
40 log.Errorf("Broadcast new tx error. %v", err)
49 // txSyncLoop takes care of the initial transaction sync for each new
50 // connection. When a new peer appears, we relay all currently pending
51 // transactions. In order to minimise egress bandwidth usage, we send
52 // the transactions in small packs to one peer at a time.
53 func (sm *SyncManager) txSyncLoop() {
54 pending := make(map[string]*txSyncMsg)
55 sending := false // whether a send is active
56 done := make(chan error, 1) // result of the send
58 // send starts a sending a pack of transactions from the sync.
59 send := func(msg *txSyncMsg) {
60 peer := sm.peers.getPeer(msg.peerID)
62 delete(pending, msg.peerID)
65 totalSize := uint64(0)
66 sendTxs := []*types.Tx{}
67 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
68 sendTxs = append(sendTxs, msg.txs[i])
69 totalSize += msg.txs[i].SerializedSize
72 copy(msg.txs, msg.txs[len(sendTxs):])
73 if len(msg.txs) == 0 {
74 delete(pending, msg.peerID)
77 // Send the pack in the background.
78 log.WithFields(log.Fields{
79 "count": len(sendTxs),
82 }).Debug("txSyncLoop sending transactions")
85 ok, err := peer.sendTransactions(sendTxs)
87 sm.peers.removePeer(msg.peerID)
93 // pick chooses the next pending sync.
94 pick := func() *txSyncMsg {
95 if len(pending) == 0 {
99 n := rand.Intn(len(pending)) + 1
100 for _, s := range pending {
110 case msg := <-sm.txSyncCh:
111 pending[msg.peerID] = msg
119 log.WithField("err", err).Warning("fail on txSyncLoop sending")
122 if s := pick(); s != nil {