OSDN Git Service

feat(federation): add mainchain & sidechain listener (#170)
authorHAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Fri, 14 Jun 2019 03:01:02 +0000 (11:01 +0800)
committerPaladz <yzhu101@uottawa.ca>
Fri, 14 Jun 2019 03:01:02 +0000 (11:01 +0800)
* init processChainInfo

* init ErrInconsistentDB

* clean

* fix processChainInfo

* fix

* init

* init

* add err handling

* fix

* fix

* fix

* fk

* fk

* fix

* dododo

* fix

* fk

* add add add

* fix

* fix

* fk

* fix

* clean

* dododo

* fk

* fox

* add

* add

* draft

* clean up

* clean up

* clean

* fix typo

* dododo

* rename

* fix

* fix

* add

* add

* fix https://github.com/Bytom/vapor/pull/170#discussion_r293642418

* fix

* clean

docs/federation/federation.sql
federation/common/const.go
federation/database/orm/cross_transaction.go
federation/synchron/errors.go [new file with mode: 0644]
federation/synchron/mainchain_keeper.go
federation/synchron/sidechain_keeper.go

index 1d6f894..b2613c5 100644 (file)
@@ -6,6 +6,8 @@
 /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
 /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
 
+DROP DATABASE `federation`;
+
 CREATE SCHEMA IF NOT EXISTS `federation`;
 
 USE `federation`;
@@ -13,10 +15,8 @@ USE `federation`;
 # Dump of table warders
 # ------------------------------------------------------------
 
-DROP TABLE IF EXISTS `warders`;
-
 CREATE TABLE `warders` (
-  `id` tinyint(1) unsigned NOT NULL AUTO_INCREMENT,
+  `id` tinyint(1) NOT NULL AUTO_INCREMENT,
   `pubkey` varchar(64) NOT NULL,
   `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
   `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
@@ -60,7 +60,7 @@ UNLOCK TABLES;
 
 CREATE TABLE `cross_transactions` (
   `id` int(11) NOT NULL AUTO_INCREMENT,
-  `chain_id` int(11) NOT NULL,
+  `chain_id` tinyint(1) NOT NULL,
   `source_block_height` int(11) NOT NULL,
   `source_block_hash` char(64) NOT NULL,
   `source_tx_index` int(11) NOT NULL,
@@ -77,12 +77,11 @@ CREATE TABLE `cross_transactions` (
   PRIMARY KEY (`id`),
   UNIQUE KEY `source_mux_id` (`chain_id`,`source_mux_id`),
   UNIQUE KEY `source_tx_hash` (`chain_id`,`source_tx_hash`),
-  UNIQUE KEY `source_raw_transaction` (`source_raw_transaction`),
-  UNIQUE KEY `source_blockhash_txidx` (`chain_id`,`source_block_hash`,`tx_index`),
-  UNIQUE KEY `source_blockheight_txidx` (`chain_id`,`source_block_height`,`tx_index`),
+  UNIQUE KEY `source_blockhash_txidx` (`chain_id`,`source_block_hash`,`source_tx_index`),
+  UNIQUE KEY `source_blockheight_txidx` (`chain_id`,`source_block_height`,`source_tx_index`),
   UNIQUE KEY `dest_tx_hash` (`chain_id`,`dest_tx_hash`),
-  UNIQUE KEY `dest_blockhash_txidx` (`chain_id`,`dest_block_hash`,`tx_index`),
-  UNIQUE KEY `dest_blockheight_txidx` (`chain_id`,`dest_block_height`,`tx_index`),
+  UNIQUE KEY `dest_blockhash_txidx` (`chain_id`,`dest_block_hash`,`dest_tx_index`),
+  UNIQUE KEY `dest_blockheight_txidx` (`chain_id`,`dest_block_height`,`dest_tx_index`),
   CONSTRAINT `cross_transactions_ibfk_1` FOREIGN KEY (`chain_id`) REFERENCES `chains` (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
@@ -116,7 +115,7 @@ UNLOCK TABLES;
 CREATE TABLE `cross_transaction_signs` (
   `id` int(11) NOT NULL AUTO_INCREMENT,
   `cross_transaction_id` int(11) NOT NULL,
-  `warder_id` int(11) NOT NULL,
+  `warder_id` tinyint(1) NOT NULL,
   `signatures` text NOT NULL,
   `status` tinyint(1) NOT NULL,
   `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -134,8 +133,6 @@ UNLOCK TABLES;
 # Dump of table assets
 # ------------------------------------------------------------
 
-DROP TABLE IF EXISTS `assets`;
-
 CREATE TABLE `assets` (
   `id` int(11) NOT NULL AUTO_INCREMENT,
   `asset_id` varchar(64) NOT NULL,
@@ -145,8 +142,7 @@ CREATE TABLE `assets` (
   `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
-  UNIQUE KEY `asset_id` (`asset_id`),
-  UNIQUE KEY `asset_meta` (`issuance_program`,`vm_version`,`raw_definition_byte`)
+  UNIQUE KEY `asset_id` (`asset_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
 LOCK TABLES `assets` WRITE;
index 663c66a..e843f9e 100644 (file)
@@ -2,8 +2,9 @@ package common
 
 const (
        CrossTxPendingStatus uint8 = iota
-       CrossTxCompletedStatus
        CrossTxRejectedStatus
+       CrossTxSubmittedStatus
+       CrossTxCompletedStatus
 )
 
 const (
index 345074b..ddb0841 100644 (file)
@@ -1,6 +1,8 @@
 package orm
 
 import (
+       "database/sql"
+
        "github.com/vapor/federation/types"
 )
 
@@ -13,10 +15,10 @@ type CrossTransaction struct {
        SourceMuxID          string
        SourceTxHash         string
        SourceRawTransaction string
-       DestBlockHeight      uint64
-       DestBlockHash        string
-       DestTxIndex          uint64
-       DestTxHash           string
+       DestBlockHeight      sql.NullInt64
+       DestBlockHash        sql.NullString
+       DestTxIndex          sql.NullInt64
+       DestTxHash           sql.NullString
        Status               uint8
        CreatedAt            types.Timestamp
        UpdatedAt            types.Timestamp
diff --git a/federation/synchron/errors.go b/federation/synchron/errors.go
new file mode 100644 (file)
index 0000000..5e58fde
--- /dev/null
@@ -0,0 +1,10 @@
+package synchron
+
+import (
+       "github.com/vapor/errors"
+)
+
+var (
+       ErrInconsistentDB = errors.New("inconsistent db status")
+       ErrOutputType     = errors.New("error output type")
+)
index c53a90d..50f7a03 100644 (file)
@@ -1,14 +1,21 @@
 package synchron
 
 import (
+       "bytes"
+       "database/sql"
        "encoding/hex"
+       "fmt"
        "time"
 
-       btmTypes "github.com/bytom/protocol/bc/types"
+       "github.com/bytom/consensus"
+       btmBc "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
        "github.com/jinzhu/gorm"
        log "github.com/sirupsen/logrus"
 
+       vaporCfg "github.com/vapor/config"
        "github.com/vapor/errors"
+       "github.com/vapor/federation/common"
        "github.com/vapor/federation/config"
        "github.com/vapor/federation/database"
        "github.com/vapor/federation/database/orm"
@@ -16,6 +23,8 @@ import (
        "github.com/vapor/protocol/bc"
 )
 
+var fedProg = vaporCfg.FederationProgrom(vaporCfg.CommonConfig)
+
 type mainchainKeeper struct {
        cfg        *config.Chain
        db         *gorm.DB
@@ -71,7 +80,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) {
                return false, err
        }
 
-       nextBlock := &btmTypes.Block{}
+       nextBlock := &types.Block{}
        if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
                return false, errors.New("Unmarshal nextBlock")
        }
@@ -81,7 +90,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) {
                        "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
                        "db block_hash":            chain.BlockHash,
                }).Fatal("BlockHash mismatch")
-               return false, errors.New("BlockHash mismatch")
+               return false, ErrInconsistentDB
        }
 
        if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
@@ -91,11 +100,11 @@ func (m *mainchainKeeper) syncBlock() (bool, error) {
        return true, nil
 }
 
-func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
+func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
        blockHash := block.Hash()
        log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
        m.db.Begin()
-       if err := m.processBlock(block); err != nil {
+       if err := m.processBlock(chain, block, txStatus); err != nil {
                m.db.Rollback()
                return err
        }
@@ -103,19 +112,172 @@ func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block
        return m.db.Commit().Error
 }
 
-func (m *mainchainKeeper) processBlock(block *btmTypes.Block) error {
+func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
        if err := m.processIssuing(block.Transactions); err != nil {
                return err
        }
 
+       for i, tx := range block.Transactions {
+               if m.isDepositTx(tx) {
+                       if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
+                               return err
+                       }
+               }
+
+               if m.isWithdrawalTx(tx) {
+                       if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
+                               return err
+                       }
+               }
+       }
+
+       return m.processChainInfo(chain, block)
+}
+
+func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
+       for _, output := range tx.Outputs {
+               if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
+       for _, input := range tx.Inputs {
+               if bytes.Equal(input.ControlProgram(), fedProg) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
+       blockHash := block.Hash()
+
+       var muxID btmBc.Hash
+       res0ID := tx.ResultIds[0]
+       switch res := tx.Entries[*res0ID].(type) {
+       case *btmBc.Output:
+               muxID = *res.Source.Ref
+       case *btmBc.Retirement:
+               muxID = *res.Source.Ref
+       default:
+               return ErrOutputType
+       }
+
+       rawTx, err := tx.MarshalText()
+       if err != nil {
+               return err
+       }
+
+       ormTx := &orm.CrossTransaction{
+               ChainID:              chain.ID,
+               SourceBlockHeight:    block.Height,
+               SourceBlockHash:      blockHash.String(),
+               SourceTxIndex:        txIndex,
+               SourceMuxID:          muxID.String(),
+               SourceTxHash:         tx.ID.String(),
+               SourceRawTransaction: string(rawTx),
+               DestBlockHeight:      sql.NullInt64{Valid: false},
+               DestBlockHash:        sql.NullString{Valid: false},
+               DestTxIndex:          sql.NullInt64{Valid: false},
+               DestTxHash:           sql.NullString{Valid: false},
+               Status:               common.CrossTxPendingStatus,
+       }
+       if err := m.db.Create(ormTx).Error; err != nil {
+               return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
+       }
+
+       statusFail := txStatus.VerifyStatus[txIndex].StatusFail
+       crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
+       if err != nil {
+               return err
+       }
+
+       for _, input := range crossChainInputs {
+               if err := m.db.Create(input).Error; err != nil {
+                       return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
+               }
+       }
+
+       return nil
+}
+
+func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
+       // assume inputs are from an identical owner
+       script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
+       inputs := []*orm.CrossTransactionReq{}
+       for i, rawOutput := range tx.Outputs {
+               // check valid deposit
+               if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, fedProg) {
+                       continue
+               }
+
+               if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *consensus.BTMAssetID {
+                       continue
+               }
+
+               asset, err := m.getAsset(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
+               if err != nil {
+                       return nil, err
+               }
+
+               input := &orm.CrossTransactionReq{
+                       CrossTransactionID: crossTransactionID,
+                       SourcePos:          uint64(i),
+                       AssetID:            asset.ID,
+                       AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
+                       Script:             script,
+               }
+               inputs = append(inputs, input)
+       }
+       return inputs, nil
+}
+
+func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
+       blockHash := block.Hash()
+       stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
+               Where(&orm.CrossTransaction{
+                       DestTxHash: sql.NullString{tx.ID.String(), true},
+                       Status:     common.CrossTxSubmittedStatus,
+               }).UpdateColumn(&orm.CrossTransaction{
+               DestBlockHeight: sql.NullInt64{int64(block.Height), true},
+               DestBlockHash:   sql.NullString{blockHash.String(), true},
+               DestTxIndex:     sql.NullInt64{int64(txIndex), true},
+               Status:          common.CrossTxCompletedStatus,
+       })
+       if stmt.Error != nil {
+               return stmt.Error
+       }
+
+       if stmt.RowsAffected != 1 {
+               log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
+       }
+       return nil
+}
+
+func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
+       blockHash := block.Hash()
+       chain.BlockHash = blockHash.String()
+       chain.BlockHeight = block.Height
+       res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
+       if err := res.Error; err != nil {
+               return err
+       }
+
+       if res.RowsAffected != 1 {
+               return ErrInconsistentDB
+       }
+
        return nil
 }
 
-func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
+func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
        for _, tx := range txs {
                for _, input := range tx.Inputs {
                        switch inp := input.TypedInput.(type) {
-                       case *btmTypes.IssuanceInput:
+                       case *types.IssuanceInput:
                                assetID := inp.AssetID()
                                if _, err := m.getAsset(assetID.String()); err == nil {
                                        continue
index 10c27d0..fed458e 100644 (file)
 package synchron
 
 import (
+       "database/sql"
+       "encoding/hex"
+       "fmt"
+       "time"
+
        "github.com/jinzhu/gorm"
+       log "github.com/sirupsen/logrus"
 
+       "github.com/vapor/consensus"
+       "github.com/vapor/errors"
+       "github.com/vapor/federation/common"
        "github.com/vapor/federation/config"
+       "github.com/vapor/federation/database"
+       "github.com/vapor/federation/database/orm"
        "github.com/vapor/federation/service"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
 )
 
 type sidechainKeeper struct {
-       cfg       *config.Chain
-       db        *gorm.DB
-       node      *service.Node
-       chainName string
+       cfg        *config.Chain
+       db         *gorm.DB
+       node       *service.Node
+       chainName  string
+       assetCache *database.AssetCache
 }
 
 func NewSidechainKeeper(db *gorm.DB, chainCfg *config.Chain) *sidechainKeeper {
        return &sidechainKeeper{
-               cfg:       chainCfg,
-               db:        db,
-               node:      service.NewNode(chainCfg.Upstream),
-               chainName: chainCfg.Name,
+               cfg:        chainCfg,
+               db:         db,
+               node:       service.NewNode(chainCfg.Upstream),
+               chainName:  chainCfg.Name,
+               assetCache: database.NewAssetCache(),
+       }
+}
+
+func (s *sidechainKeeper) Run() {
+       ticker := time.NewTicker(time.Duration(s.cfg.SyncSeconds) * time.Second)
+       for ; true; <-ticker.C {
+               for {
+                       isUpdate, err := s.syncBlock()
+                       if err != nil {
+                               log.WithField("error", err).Errorln("blockKeeper fail on process block")
+                               break
+                       }
+
+                       if !isUpdate {
+                               break
+                       }
+               }
+       }
+}
+
+func (s *sidechainKeeper) syncBlock() (bool, error) {
+       chain := &orm.Chain{Name: s.chainName}
+       if err := s.db.Where(chain).First(chain).Error; err != nil {
+               return false, errors.Wrap(err, "query chain")
+       }
+
+       height, err := s.node.GetBlockCount()
+       if err != nil {
+               return false, err
+       }
+
+       if height <= chain.BlockHeight+s.cfg.Confirmations {
+               return false, nil
+       }
+
+       nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1)
+       if err != nil {
+               return false, err
+       }
+
+       nextBlock := &types.Block{}
+       if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
+               return false, errors.New("Unmarshal nextBlock")
+       }
+
+       if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
+               log.WithFields(log.Fields{
+                       "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
+                       "db block_hash":            chain.BlockHash,
+               }).Fatal("BlockHash mismatch")
+               return false, ErrInconsistentDB
+       }
+
+       if err := s.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
+               return false, err
+       }
+
+       return true, nil
+}
+
+func (s *sidechainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
+       blockHash := block.Hash()
+       log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
+       s.db.Begin()
+       if err := s.processBlock(chain, block, txStatus); err != nil {
+               s.db.Rollback()
+               return err
+       }
+
+       return s.db.Commit().Error
+}
+
+func (s *sidechainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
+       for i, tx := range block.Transactions {
+               if s.isDepositTx(tx) {
+                       if err := s.processDepositTx(chain, block, uint64(i), tx); err != nil {
+                               return err
+                       }
+               }
+
+               if s.isWithdrawalTx(tx) {
+                       if err := s.processWithdrawalTx(chain, block, txStatus, uint64(i), tx); err != nil {
+                               return err
+                       }
+               }
+       }
+
+       return s.processChainInfo(chain, block)
+}
+
+func (s *sidechainKeeper) isDepositTx(tx *types.Tx) bool {
+       for _, input := range tx.Inputs {
+               if input.InputType() == types.CrossChainInputType {
+                       return true
+               }
+       }
+       return false
+}
+
+func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) bool {
+       for _, output := range tx.Outputs {
+               if output.OutputType() == types.CrossChainOutputType {
+                       return true
+               }
        }
+       return false
 }
 
-func (s *sidechainKeeper) Run() {}
+func (s *sidechainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
+       blockHash := block.Hash()
+       stmt := s.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
+               Where(&orm.CrossTransaction{
+                       DestTxHash: sql.NullString{tx.ID.String(), true},
+                       Status:     common.CrossTxSubmittedStatus,
+               }).UpdateColumn(&orm.CrossTransaction{
+               DestBlockHeight: sql.NullInt64{int64(block.Height), true},
+               DestBlockHash:   sql.NullString{blockHash.String(), true},
+               DestTxIndex:     sql.NullInt64{int64(txIndex), true},
+               Status:          common.CrossTxCompletedStatus,
+       })
+       if stmt.Error != nil {
+               return stmt.Error
+       }
+
+       if stmt.RowsAffected != 1 {
+               log.Warnf("sidechainKeeper.processDepositTx(%v): rows affected != 1", tx.ID.String())
+       }
+       return nil
+}
+
+func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
+       blockHash := block.Hash()
+
+       var muxID bc.Hash
+       res0ID := tx.ResultIds[0]
+       switch res := tx.Entries[*res0ID].(type) {
+       case *bc.CrossChainOutput:
+               muxID = *res.Source.Ref
+       case *bc.IntraChainOutput:
+               muxID = *res.Source.Ref
+       case *bc.VoteOutput:
+               muxID = *res.Source.Ref
+       default:
+               return ErrOutputType
+       }
+
+       rawTx, err := tx.MarshalText()
+       if err != nil {
+               return err
+       }
+
+       ormTx := &orm.CrossTransaction{
+               ChainID:              chain.ID,
+               SourceBlockHeight:    block.Height,
+               SourceBlockHash:      blockHash.String(),
+               SourceTxIndex:        txIndex,
+               SourceMuxID:          muxID.String(),
+               SourceTxHash:         tx.ID.String(),
+               SourceRawTransaction: string(rawTx),
+               DestBlockHeight:      sql.NullInt64{Valid: false},
+               DestBlockHash:        sql.NullString{Valid: false},
+               DestTxIndex:          sql.NullInt64{Valid: false},
+               DestTxHash:           sql.NullString{Valid: false},
+               Status:               common.CrossTxPendingStatus,
+       }
+       if err := s.db.Create(ormTx).Error; err != nil {
+               return errors.Wrap(err, fmt.Sprintf("create sidechain WithdrawalTx %s", tx.ID.String()))
+       }
+
+       statusFail := txStatus.VerifyStatus[txIndex].StatusFail
+       crossChainOutputs, err := s.getCrossChainReqs(ormTx.ID, tx, statusFail)
+       if err != nil {
+               return err
+       }
+
+       for _, output := range crossChainOutputs {
+               if err := s.db.Create(output).Error; err != nil {
+                       return errors.Wrap(err, fmt.Sprintf("create WithdrawalFromSidechain output: txid(%s), pos(%d)", tx.ID.String(), output.SourcePos))
+               }
+       }
+
+       return nil
+}
+
+func (s *sidechainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
+       // assume inputs are from an identical owner
+       script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
+       inputs := []*orm.CrossTransactionReq{}
+       for i, rawOutput := range tx.Outputs {
+               // check valid withdrawal
+               if rawOutput.OutputType() != types.CrossChainOutputType {
+                       continue
+               }
+
+               if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID {
+                       continue
+               }
+
+               asset, err := s.getAsset(rawOutput.OutputCommitment().AssetAmount.AssetId.String())
+               if err != nil {
+                       return nil, err
+               }
+
+               input := &orm.CrossTransactionReq{
+                       CrossTransactionID: crossTransactionID,
+                       SourcePos:          uint64(i),
+                       AssetID:            asset.ID,
+                       AssetAmount:        rawOutput.OutputCommitment().AssetAmount.Amount,
+                       Script:             script,
+               }
+               inputs = append(inputs, input)
+       }
+       return inputs, nil
+}
+
+func (s *sidechainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
+       blockHash := block.Hash()
+       chain.BlockHash = blockHash.String()
+       chain.BlockHeight = block.Height
+       res := s.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
+       if err := res.Error; err != nil {
+               return err
+       }
+
+       if res.RowsAffected != 1 {
+               return ErrInconsistentDB
+       }
+
+       return nil
+}
+
+func (s *sidechainKeeper) getAsset(assetID string) (*orm.Asset, error) {
+       if asset := s.assetCache.Get(assetID); asset != nil {
+               return asset, nil
+       }
+
+       asset := &orm.Asset{AssetID: assetID}
+       if err := s.db.Where(asset).First(asset).Error; err != nil {
+               return nil, errors.Wrap(err, "asset not found in memory and mysql")
+       }
+
+       s.assetCache.Add(assetID, asset)
+       return asset, nil
+}