OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / netsync / chainmgr / tx_keeper.go
diff --git a/netsync/chainmgr/tx_keeper.go b/netsync/chainmgr/tx_keeper.go
new file mode 100644 (file)
index 0000000..6c6f5f9
--- /dev/null
@@ -0,0 +1,146 @@
+package chainmgr
+
+import (
+       "math/rand"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       core "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc/types"
+)
+
+const (
+       // This is the target size for the packs of transactions sent by txSyncLoop.
+       // A pack can get larger than this if a single transactions exceeds this size.
+       txSyncPackSize = 100 * 1024
+)
+
+type txSyncMsg struct {
+       peerID string
+       txs    []*types.Tx
+}
+
+func (m *Manager) syncMempool(peerID string) {
+       pending := m.mempool.GetTransactions()
+       if len(pending) == 0 {
+               return
+       }
+
+       txs := make([]*types.Tx, len(pending))
+       for i, batch := range pending {
+               txs[i] = batch.Tx
+       }
+       m.txSyncCh <- &txSyncMsg{peerID, txs}
+}
+
+func (m *Manager) broadcastTxsLoop() {
+       for {
+               select {
+               case obj, ok := <-m.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
+                               return
+                       }
+
+                       ev, ok := obj.Data.(core.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if ev.TxMsg.MsgType == core.MsgNewTx {
+                               if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+                                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
+                                       continue
+                               }
+                       }
+               case <-m.quit:
+                       return
+               }
+       }
+}
+
+// syncMempoolLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (m *Manager) syncMempoolLoop() {
+       pending := make(map[string]*txSyncMsg)
+       sending := false            // whether a send is active
+       done := make(chan error, 1) // result of the send
+
+       // send starts a sending a pack of transactions from the sync.
+       send := func(msg *txSyncMsg) {
+               peer := m.peers.GetPeer(msg.peerID)
+               if peer == nil {
+                       delete(pending, msg.peerID)
+                       return
+               }
+
+               totalSize := uint64(0)
+               sendTxs := []*types.Tx{}
+               for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
+                       sendTxs = append(sendTxs, msg.txs[i])
+                       totalSize += msg.txs[i].SerializedSize
+               }
+
+               if len(msg.txs) == len(sendTxs) {
+                       delete(pending, msg.peerID)
+               } else {
+                       msg.txs = msg.txs[len(sendTxs):]
+               }
+
+               // Send the pack in the background.
+               log.WithFields(log.Fields{
+                       "module": logModule,
+                       "count":  len(sendTxs),
+                       "bytes":  totalSize,
+                       "peer":   msg.peerID,
+               }).Debug("txSyncLoop sending transactions")
+               sending = true
+               go func() {
+                       err := peer.SendTransactions(sendTxs)
+                       if err != nil {
+                               m.peers.RemovePeer(msg.peerID)
+                       }
+                       done <- err
+               }()
+       }
+
+       // pick chooses the next pending sync.
+       pick := func() *txSyncMsg {
+               if len(pending) == 0 {
+                       return nil
+               }
+
+               n := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(pending)) + 1
+               for _, s := range pending {
+                       if n--; n == 0 {
+                               return s
+                       }
+               }
+               return nil
+       }
+
+       for {
+               select {
+               case msg := <-m.txSyncCh:
+                       pending[msg.peerID] = msg
+                       if !sending {
+                               send(msg)
+                       }
+               case err := <-done:
+                       sending = false
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on txSyncLoop sending")
+                       }
+
+                       if s := pick(); s != nil {
+                               send(s)
+                       }
+               case <-m.quit:
+                       return
+               }
+       }
+}