}
for _, tx := range txs {
- go w.processCrossTx(tx)
+ go w.tryProcessCrossTx(tx)
}
}
}
-func (w *warder) processCrossTx(ormTx *orm.CrossTransaction) {
+func (w *warder) tryProcessCrossTx(ormTx *orm.CrossTransaction) error {
+ dbTx := w.db.Begin()
+ if err := w.processCrossTx(ormTx); err != nil {
+ dbTx.Rollback()
+ return err
+ }
+
+ return dbTx.Commit().Error
+}
+
+func (w *warder) processCrossTx(ormTx *orm.CrossTransaction) error {
if err := w.validateCrossTx(ormTx); err != nil {
log.Warnln("invalid cross-chain tx", ormTx)
- return
+ return err
}
destTx, destTxID, err := w.proposeDestTx(ormTx)
if err != nil {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
- return
+ return err
}
if err := w.initDestTxSigns(destTx, ormTx); err != nil {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
- return
+ return err
}
if err := w.signDestTx(destTx, ormTx); err != nil {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
- return
+ return err
}
for _, remote := range w.remotes {
signs, err := remote.RequestSign(destTx, ormTx)
if err != nil {
log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
- return
+ return err
}
w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
submittedTxID, err := w.submitTx(destTx)
if err != nil {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
- return
+ return err
}
if submittedTxID != destTxID {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
- return
+ return err
}
if err := w.updateSubmission(ormTx); err != nil {
log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
- return
+ return err
}
}
+
+ return nil
}
func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {