OSDN Git Service

Add block fast sync function (#1104)
[bytom/bytom.git] / netsync / tx_keeper.go
1 package netsync
2
3 import (
4         "math/rand"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/protocol/bc/types"
9 )
10
11 const (
12         // This is the target size for the packs of transactions sent by txSyncLoop.
13         // A pack can get larger than this if a single transactions exceeds this size.
14         txSyncPackSize = 100 * 1024
15 )
16
17 type txSyncMsg struct {
18         peerID string
19         txs    []*types.Tx
20 }
21
22 func (sm *SyncManager) syncTransactions(peerID string) {
23         pending := sm.txPool.GetTransactions()
24         if len(pending) == 0 {
25                 return
26         }
27
28         txs := make([]*types.Tx, len(pending))
29         for i, batch := range pending {
30                 txs[i] = batch.Tx
31         }
32         sm.txSyncCh <- &txSyncMsg{peerID, txs}
33 }
34
35 func (sm *SyncManager) txBroadcastLoop() {
36         for {
37                 select {
38                 case newTx := <-sm.newTxCh:
39                         if err := sm.peers.broadcastTx(newTx); err != nil {
40                                 log.Errorf("Broadcast new tx error. %v", err)
41                                 return
42                         }
43                 case <-sm.quitSync:
44                         return
45                 }
46         }
47 }
48
49 // txSyncLoop takes care of the initial transaction sync for each new
50 // connection. When a new peer appears, we relay all currently pending
51 // transactions. In order to minimise egress bandwidth usage, we send
52 // the transactions in small packs to one peer at a time.
53 func (sm *SyncManager) txSyncLoop() {
54         pending := make(map[string]*txSyncMsg)
55         sending := false            // whether a send is active
56         done := make(chan error, 1) // result of the send
57
58         // send starts a sending a pack of transactions from the sync.
59         send := func(msg *txSyncMsg) {
60                 peer := sm.peers.getPeer(msg.peerID)
61                 if peer == nil {
62                         delete(pending, msg.peerID)
63                 }
64
65                 totalSize := uint64(0)
66                 sendTxs := []*types.Tx{}
67                 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
68                         sendTxs = append(sendTxs, msg.txs[i])
69                         totalSize += msg.txs[i].SerializedSize
70                 }
71
72                 copy(msg.txs, msg.txs[len(sendTxs):])
73                 if len(msg.txs) == 0 {
74                         delete(pending, msg.peerID)
75                 }
76
77                 // Send the pack in the background.
78                 log.WithFields(log.Fields{
79                         "count": len(sendTxs),
80                         "bytes": totalSize,
81                         "peer":  msg.peerID,
82                 }).Debug("txSyncLoop sending transactions")
83                 sending = true
84                 go func() {
85                         ok, err := peer.sendTransactions(sendTxs)
86                         if !ok {
87                                 sm.peers.removePeer(msg.peerID)
88                         }
89                         done <- err
90                 }()
91         }
92
93         // pick chooses the next pending sync.
94         pick := func() *txSyncMsg {
95                 if len(pending) == 0 {
96                         return nil
97                 }
98
99                 n := rand.Intn(len(pending)) + 1
100                 for _, s := range pending {
101                         if n--; n == 0 {
102                                 return s
103                         }
104                 }
105                 return nil
106         }
107
108         for {
109                 select {
110                 case msg := <-sm.txSyncCh:
111                         pending[msg.peerID] = msg
112                         if !sending {
113                                 send(msg)
114                         }
115
116                 case err := <-done:
117                         sending = false
118                         if err != nil {
119                                 log.WithField("err", err).Warning("fail on txSyncLoop sending")
120                         }
121
122                         if s := pick(); s != nil {
123                                 send(s)
124                         }
125                 }
126         }
127 }