OSDN Git Service

new repo
[bytom/vapor.git] / netsync / tx_keeper.go
1 package netsync
2
3 import (
4         "math/rand"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/protocol/bc/types"
9 )
10
11 const (
12         // This is the target size for the packs of transactions sent by txSyncLoop.
13         // A pack can get larger than this if a single transactions exceeds this size.
14         txSyncPackSize = 100 * 1024
15 )
16
17 type txSyncMsg struct {
18         peerID string
19         txs    []*types.Tx
20 }
21
22 func (sm *SyncManager) syncTransactions(peerID string) {
23         pending := sm.txPool.GetTransactions()
24         if len(pending) == 0 {
25                 return
26         }
27
28         txs := make([]*types.Tx, len(pending))
29         for i, batch := range pending {
30                 txs[i] = batch.Tx
31         }
32         sm.txSyncCh <- &txSyncMsg{peerID, txs}
33 }
34
35 func (sm *SyncManager) txBroadcastLoop() {
36         for {
37                 select {
38                 case newTx := <-sm.newTxCh:
39                         if err := sm.peers.broadcastTx(newTx); err != nil {
40                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
41                                 return
42                         }
43                 case <-sm.quitSync:
44                         return
45                 }
46         }
47 }
48
49 // txSyncLoop takes care of the initial transaction sync for each new
50 // connection. When a new peer appears, we relay all currently pending
51 // transactions. In order to minimise egress bandwidth usage, we send
52 // the transactions in small packs to one peer at a time.
53 func (sm *SyncManager) txSyncLoop() {
54         pending := make(map[string]*txSyncMsg)
55         sending := false            // whether a send is active
56         done := make(chan error, 1) // result of the send
57
58         // send starts a sending a pack of transactions from the sync.
59         send := func(msg *txSyncMsg) {
60                 peer := sm.peers.getPeer(msg.peerID)
61                 if peer == nil {
62                         delete(pending, msg.peerID)
63                         return
64                 }
65
66                 totalSize := uint64(0)
67                 sendTxs := []*types.Tx{}
68                 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
69                         sendTxs = append(sendTxs, msg.txs[i])
70                         totalSize += msg.txs[i].SerializedSize
71                 }
72
73                 if len(msg.txs) == len(sendTxs) {
74                         delete(pending, msg.peerID)
75                 } else {
76                         msg.txs = msg.txs[len(sendTxs):]
77                 }
78
79                 // Send the pack in the background.
80                 log.WithFields(log.Fields{
81                         "module": logModule,
82                         "count":  len(sendTxs),
83                         "bytes":  totalSize,
84                         "peer":   msg.peerID,
85                 }).Debug("txSyncLoop sending transactions")
86                 sending = true
87                 go func() {
88                         ok, err := peer.sendTransactions(sendTxs)
89                         if !ok {
90                                 sm.peers.removePeer(msg.peerID)
91                         }
92                         done <- err
93                 }()
94         }
95
96         // pick chooses the next pending sync.
97         pick := func() *txSyncMsg {
98                 if len(pending) == 0 {
99                         return nil
100                 }
101
102                 n := rand.Intn(len(pending)) + 1
103                 for _, s := range pending {
104                         if n--; n == 0 {
105                                 return s
106                         }
107                 }
108                 return nil
109         }
110
111         for {
112                 select {
113                 case msg := <-sm.txSyncCh:
114                         pending[msg.peerID] = msg
115                         if !sending {
116                                 send(msg)
117                         }
118
119                 case err := <-done:
120                         sending = false
121                         if err != nil {
122                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
123                         }
124
125                         if s := pick(); s != nil {
126                                 send(s)
127                         }
128                 }
129         }
130 }