if warderCfg.IsLocal {
local = service.NewWarder(&warderCfg)
} else {
- remoteWarder := service.NewWarder(&warderCfg)
- remotes = append(remotes, remoteWarder)
+ remote := service.NewWarder(&warderCfg)
+ remotes = append(remotes, remote)
}
}
}
func (w *warder) Run() {
- go w.collectPendingTx()
- go w.processCrossTxRoutine()
-}
-
-func (w *warder) collectPendingTx() {
ticker := time.NewTicker(collectInterval)
for ; true; <-ticker.C {
txs := []*orm.CrossTransaction{}
}
for _, tx := range txs {
- w.txCh <- tx
+ go w.processCrossTx(tx)
}
}
}
-func (w *warder) processCrossTxRoutine() {
- for ormTx := range w.txCh {
- if err := w.validateCrossTx(ormTx); err != nil {
- log.Warnln("invalid cross-chain tx", ormTx)
- continue
- }
+func (w *warder) processCrossTx(ormTx *orm.CrossTransaction) {
+ if err := w.validateCrossTx(ormTx); err != nil {
+ log.Warnln("invalid cross-chain tx", ormTx)
+ return
+ }
- destTx, destTxID, err := w.proposeDestTx(ormTx)
+ destTx, destTxID, err := w.proposeDestTx(ormTx)
+ if err != nil {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
+ return
+ }
+
+ if err := w.initDestTxSigns(destTx, ormTx); err != nil {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
+ return
+ }
+
+ if err := w.signDestTx(destTx, ormTx); err != nil {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
+ return
+ }
+
+ for _, remote := range w.remotes {
+ signs, err := remote.RequestSign(destTx, ormTx)
if err != nil {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
- continue
+ log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
+ return
}
- if err := w.initDestTxSigns(destTx, ormTx); err != nil {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
- continue
- }
+ w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
+ }
- if err := w.signDestTx(destTx, ormTx); err != nil {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
- continue
+ if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
+ submittedTxID, err := w.submitTx(destTx)
+ if err != nil {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
+ return
}
- for _, remote := range w.remotes {
- signs, err := remote.RequestSign(destTx, ormTx)
- if err != nil {
- log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
- continue
- }
-
- w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
+ if submittedTxID != destTxID {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
+ return
}
- if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
- submittedTxID, err := w.submitTx(destTx)
- if err != nil {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
- continue
- }
-
- if submittedTxID != destTxID {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
- continue
- }
-
- if err := w.updateSubmission(ormTx); err != nil {
- log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
- continue
- }
+ if err := w.updateSubmission(ormTx); err != nil {
+ log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
+ return
}
}
}