OSDN Git Service

Add sync completion status broadcast function (#1208)
[bytom/bytom.git] / netsync / tx_keeper.go
1 package netsync
2
3 import (
4         "math/rand"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/protocol/bc/types"
9 )
10
11 const (
12         // This is the target size for the packs of transactions sent by txSyncLoop.
13         // A pack can get larger than this if a single transactions exceeds this size.
14         txSyncPackSize = 100 * 1024
15 )
16
17 type txSyncMsg struct {
18         peerID string
19         txs    []*types.Tx
20 }
21
22 func (sm *SyncManager) syncTransactions(peerID string) {
23         pending := sm.txPool.GetTransactions()
24         if len(pending) == 0 {
25                 return
26         }
27
28         txs := make([]*types.Tx, len(pending))
29         for i, batch := range pending {
30                 txs[i] = batch.Tx
31         }
32         sm.txSyncCh <- &txSyncMsg{peerID, txs}
33 }
34
35 func (sm *SyncManager) txBroadcastLoop() {
36         for {
37                 select {
38                 case newTx := <-sm.newTxCh:
39                         if err := sm.peers.broadcastTx(newTx); err != nil {
40                                 log.Errorf("Broadcast new tx error. %v", err)
41                                 return
42                         }
43                 case <-sm.quitSync:
44                         return
45                 }
46         }
47 }
48
49 // txSyncLoop takes care of the initial transaction sync for each new
50 // connection. When a new peer appears, we relay all currently pending
51 // transactions. In order to minimise egress bandwidth usage, we send
52 // the transactions in small packs to one peer at a time.
53 func (sm *SyncManager) txSyncLoop() {
54         pending := make(map[string]*txSyncMsg)
55         sending := false            // whether a send is active
56         done := make(chan error, 1) // result of the send
57
58         // send starts a sending a pack of transactions from the sync.
59         send := func(msg *txSyncMsg) {
60                 peer := sm.peers.getPeer(msg.peerID)
61                 if peer == nil {
62                         delete(pending, msg.peerID)
63                         return
64                 }
65
66                 totalSize := uint64(0)
67                 sendTxs := []*types.Tx{}
68                 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
69                         sendTxs = append(sendTxs, msg.txs[i])
70                         totalSize += msg.txs[i].SerializedSize
71                 }
72
73                 if len(msg.txs) == len(sendTxs) {
74                         delete(pending, msg.peerID)
75                 } else {
76                         msg.txs = msg.txs[len(sendTxs):]
77                 }
78
79                 // Send the pack in the background.
80                 log.WithFields(log.Fields{
81                         "count": len(sendTxs),
82                         "bytes": totalSize,
83                         "peer":  msg.peerID,
84                 }).Debug("txSyncLoop sending transactions")
85                 sending = true
86                 go func() {
87                         ok, err := peer.sendTransactions(sendTxs)
88                         if !ok {
89                                 sm.peers.removePeer(msg.peerID)
90                         }
91                         done <- err
92                 }()
93         }
94
95         // pick chooses the next pending sync.
96         pick := func() *txSyncMsg {
97                 if len(pending) == 0 {
98                         return nil
99                 }
100
101                 n := rand.Intn(len(pending)) + 1
102                 for _, s := range pending {
103                         if n--; n == 0 {
104                                 return s
105                         }
106                 }
107                 return nil
108         }
109
110         for {
111                 select {
112                 case msg := <-sm.txSyncCh:
113                         pending[msg.peerID] = msg
114                         if !sending {
115                                 send(msg)
116                         }
117
118                 case err := <-done:
119                         sending = false
120                         if err != nil {
121                                 log.WithField("err", err).Warning("fail on txSyncLoop sending")
122                         }
123
124                         if s := pick(); s != nil {
125                                 send(s)
126                         }
127                 }
128         }
129 }