OSDN Git Service

9f1a7cc3591989d76ae70c5793719bcdf3d36468
[bytom/vapor.git] / netsync / chainmgr / tx_keeper.go
1 package chainmgr
2
3 import (
4         "math/rand"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         core "github.com/vapor/protocol"
10         "github.com/vapor/protocol/bc/types"
11 )
12
13 const (
14         // This is the target size for the packs of transactions sent by txSyncLoop.
15         // A pack can get larger than this if a single transactions exceeds this size.
16         txSyncPackSize = 100 * 1024
17 )
18
19 type txSyncMsg struct {
20         peerID string
21         txs    []*types.Tx
22 }
23
24 func (cm *ChainManager) syncTransactions(peerID string) {
25         pending := cm.txPool.GetTransactions()
26         if len(pending) == 0 {
27                 return
28         }
29
30         txs := make([]*types.Tx, len(pending))
31         for i, batch := range pending {
32                 txs[i] = batch.Tx
33         }
34         cm.txSyncCh <- &txSyncMsg{peerID, txs}
35 }
36
37 func (cm *ChainManager) txBroadcastLoop() {
38         for {
39                 select {
40                 case obj, ok := <-cm.txMsgSub.Chan():
41                         if !ok {
42                                 log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
43                                 return
44                         }
45
46                         ev, ok := obj.Data.(core.TxMsgEvent)
47                         if !ok {
48                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
49                                 continue
50                         }
51
52                         if ev.TxMsg.MsgType == core.MsgNewTx {
53                                 if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
54                                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
55                                         continue
56                                 }
57                         }
58                 case <-cm.quitSync:
59                         return
60                 }
61         }
62 }
63
64 // txSyncLoop takes care of the initial transaction sync for each new
65 // connection. When a new peer appears, we relay all currently pending
66 // transactions. In order to minimise egress bandwidth usage, we send
67 // the transactions in small packs to one peer at a time.
68 func (cm *ChainManager) txSyncLoop() {
69         pending := make(map[string]*txSyncMsg)
70         sending := false            // whether a send is active
71         done := make(chan error, 1) // result of the send
72
73         // send starts a sending a pack of transactions from the sync.
74         send := func(msg *txSyncMsg) {
75                 peer := cm.peers.GetPeer(msg.peerID)
76                 if peer == nil {
77                         delete(pending, msg.peerID)
78                         return
79                 }
80
81                 totalSize := uint64(0)
82                 sendTxs := []*types.Tx{}
83                 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
84                         sendTxs = append(sendTxs, msg.txs[i])
85                         totalSize += msg.txs[i].SerializedSize
86                 }
87
88                 if len(msg.txs) == len(sendTxs) {
89                         delete(pending, msg.peerID)
90                 } else {
91                         msg.txs = msg.txs[len(sendTxs):]
92                 }
93
94                 // Send the pack in the background.
95                 log.WithFields(log.Fields{
96                         "module": logModule,
97                         "count":  len(sendTxs),
98                         "bytes":  totalSize,
99                         "peer":   msg.peerID,
100                 }).Debug("txSyncLoop sending transactions")
101                 sending = true
102                 go func() {
103                         err := peer.SendTransactions(sendTxs)
104                         if err != nil {
105                                 cm.peers.RemovePeer(msg.peerID)
106                         }
107                         done <- err
108                 }()
109         }
110
111         // pick chooses the next pending sync.
112         pick := func() *txSyncMsg {
113                 if len(pending) == 0 {
114                         return nil
115                 }
116
117                 n := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(pending)) + 1
118                 for _, s := range pending {
119                         if n--; n == 0 {
120                                 return s
121                         }
122                 }
123                 return nil
124         }
125
126         for {
127                 select {
128                 case msg := <-cm.txSyncCh:
129                         pending[msg.peerID] = msg
130                         if !sending {
131                                 send(msg)
132                         }
133
134                 case err := <-done:
135                         sending = false
136                         if err != nil {
137                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
138                         }
139
140                         if s := pick(); s != nil {
141                                 send(s)
142                         }
143                 }
144         }
145 }