OSDN Git Service

fix https://github.com/Bytom/vapor/pull/181#discussion_r295148126
authorHAOYUatHZ <haoyu@protonmail.com>
Wed, 19 Jun 2019 07:36:09 +0000 (15:36 +0800)
committerHAOYUatHZ <haoyu@protonmail.com>
Wed, 19 Jun 2019 07:36:09 +0000 (15:36 +0800)
federation/warder.go

index 5758f94..3238171 100644 (file)
@@ -58,8 +58,8 @@ func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
                if warderCfg.IsLocal {
                        local = service.NewWarder(&warderCfg)
                } else {
-                       remoteWarder := service.NewWarder(&warderCfg)
-                       remotes = append(remotes, remoteWarder)
+                       remote := service.NewWarder(&warderCfg)
+                       remotes = append(remotes, remote)
                }
        }
 
@@ -71,11 +71,6 @@ func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
 }
 
 func (w *warder) Run() {
-       go w.collectPendingTx()
-       go w.processCrossTxRoutine()
-}
-
-func (w *warder) collectPendingTx() {
        ticker := time.NewTicker(collectInterval)
        for ; true; <-ticker.C {
                txs := []*orm.CrossTransaction{}
@@ -90,60 +85,58 @@ func (w *warder) collectPendingTx() {
                }
 
                for _, tx := range txs {
-                       w.txCh <- tx
+                       go w.processCrossTx(tx)
                }
        }
 }
 
-func (w *warder) processCrossTxRoutine() {
-       for ormTx := range w.txCh {
-               if err := w.validateCrossTx(ormTx); err != nil {
-                       log.Warnln("invalid cross-chain tx", ormTx)
-                       continue
-               }
+func (w *warder) processCrossTx(ormTx *orm.CrossTransaction) {
+       if err := w.validateCrossTx(ormTx); err != nil {
+               log.Warnln("invalid cross-chain tx", ormTx)
+               return
+       }
 
-               destTx, destTxID, err := w.proposeDestTx(ormTx)
+       destTx, destTxID, err := w.proposeDestTx(ormTx)
+       if err != nil {
+               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
+               return
+       }
+
+       if err := w.initDestTxSigns(destTx, ormTx); err != nil {
+               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
+               return
+       }
+
+       if err := w.signDestTx(destTx, ormTx); err != nil {
+               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
+               return
+       }
+
+       for _, remote := range w.remotes {
+               signs, err := remote.RequestSign(destTx, ormTx)
                if err != nil {
-                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
-                       continue
+                       log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
+                       return
                }
 
-               if err := w.initDestTxSigns(destTx, ormTx); err != nil {
-                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
-                       continue
-               }
+               w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
+       }
 
-               if err := w.signDestTx(destTx, ormTx); err != nil {
-                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
-                       continue
+       if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
+               submittedTxID, err := w.submitTx(destTx)
+               if err != nil {
+                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
+                       return
                }
 
-               for _, remote := range w.remotes {
-                       signs, err := remote.RequestSign(destTx, ormTx)
-                       if err != nil {
-                               log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
-                               continue
-                       }
-
-                       w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
+               if submittedTxID != destTxID {
+                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
+                       return
                }
 
-               if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
-                       submittedTxID, err := w.submitTx(destTx)
-                       if err != nil {
-                               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
-                               continue
-                       }
-
-                       if submittedTxID != destTxID {
-                               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
-                               continue
-                       }
-
-                       if err := w.updateSubmission(ormTx); err != nil {
-                               log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
-                               continue
-                       }
+               if err := w.updateSubmission(ormTx); err != nil {
+                       log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
+                       return
                }
        }
 }