OSDN Git Service

Fix new mined orphan block broadcast bug (#1592)
[bytom/bytom.git] / netsync / tx_keeper.go
1 package netsync
2
3 import (
4         "math/rand"
5
6         log "github.com/sirupsen/logrus"
7
8         core "github.com/bytom/protocol"
9         "github.com/bytom/protocol/bc/types"
10 )
11
12 const (
13         // This is the target size for the packs of transactions sent by txSyncLoop.
14         // A pack can get larger than this if a single transactions exceeds this size.
15         txSyncPackSize = 100 * 1024
16 )
17
18 type txSyncMsg struct {
19         peerID string
20         txs    []*types.Tx
21 }
22
23 func (sm *SyncManager) syncTransactions(peerID string) {
24         pending := sm.txPool.GetTransactions()
25         if len(pending) == 0 {
26                 return
27         }
28
29         txs := make([]*types.Tx, len(pending))
30         for i, batch := range pending {
31                 txs[i] = batch.Tx
32         }
33         sm.txSyncCh <- &txSyncMsg{peerID, txs}
34 }
35
36 func (sm *SyncManager) txBroadcastLoop() {
37         for {
38                 select {
39                 case obj, ok := <-sm.txMsgSub.Chan():
40                         if !ok {
41                                 log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
42                                 return
43                         }
44
45                         ev, ok := obj.Data.(core.TxMsgEvent)
46                         if !ok {
47                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
48                                 continue
49                         }
50
51                         if ev.TxMsg.MsgType == core.MsgNewTx {
52                                 if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
53                                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
54                                         continue
55                                 }
56                         }
57                 case <-sm.quitSync:
58                         return
59                 }
60         }
61 }
62
63 // txSyncLoop takes care of the initial transaction sync for each new
64 // connection. When a new peer appears, we relay all currently pending
65 // transactions. In order to minimise egress bandwidth usage, we send
66 // the transactions in small packs to one peer at a time.
67 func (sm *SyncManager) txSyncLoop() {
68         pending := make(map[string]*txSyncMsg)
69         sending := false            // whether a send is active
70         done := make(chan error, 1) // result of the send
71
72         // send starts a sending a pack of transactions from the sync.
73         send := func(msg *txSyncMsg) {
74                 peer := sm.peers.getPeer(msg.peerID)
75                 if peer == nil {
76                         delete(pending, msg.peerID)
77                         return
78                 }
79
80                 totalSize := uint64(0)
81                 sendTxs := []*types.Tx{}
82                 for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
83                         sendTxs = append(sendTxs, msg.txs[i])
84                         totalSize += msg.txs[i].SerializedSize
85                 }
86
87                 if len(msg.txs) == len(sendTxs) {
88                         delete(pending, msg.peerID)
89                 } else {
90                         msg.txs = msg.txs[len(sendTxs):]
91                 }
92
93                 // Send the pack in the background.
94                 log.WithFields(log.Fields{
95                         "module": logModule,
96                         "count":  len(sendTxs),
97                         "bytes":  totalSize,
98                         "peer":   msg.peerID,
99                 }).Debug("txSyncLoop sending transactions")
100                 sending = true
101                 go func() {
102                         ok, err := peer.sendTransactions(sendTxs)
103                         if !ok {
104                                 sm.peers.removePeer(msg.peerID)
105                         }
106                         done <- err
107                 }()
108         }
109
110         // pick chooses the next pending sync.
111         pick := func() *txSyncMsg {
112                 if len(pending) == 0 {
113                         return nil
114                 }
115
116                 n := rand.Intn(len(pending)) + 1
117                 for _, s := range pending {
118                         if n--; n == 0 {
119                                 return s
120                         }
121                 }
122                 return nil
123         }
124
125         for {
126                 select {
127                 case msg := <-sm.txSyncCh:
128                         pending[msg.peerID] = msg
129                         if !sending {
130                                 send(msg)
131                         }
132
133                 case err := <-done:
134                         sending = false
135                         if err != nil {
136                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
137                         }
138
139                         if s := pick(); s != nil {
140                                 send(s)
141                         }
142                 }
143         }
144 }