X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=federation%2Fsynchron%2Fsidechain_keeper.go;h=d308e14ca6708b17e63ba3a2bd7d3404c3585309;hp=8c538160b3cf1598381c2c9df21903f0d3a0fcce;hb=a6be86325c56b458295c163200c295b43c44f2e0;hpb=a177b8b4f2828248c5bf34561b877c2578b77dd1 diff --git a/federation/synchron/sidechain_keeper.go b/federation/synchron/sidechain_keeper.go index 8c538160..d308e14c 100644 --- a/federation/synchron/sidechain_keeper.go +++ b/federation/synchron/sidechain_keeper.go @@ -10,7 +10,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/vapor/consensus" - "github.com/vapor/consensus/segwit" "github.com/vapor/errors" "github.com/vapor/federation/common" "github.com/vapor/federation/config" @@ -19,24 +18,28 @@ import ( "github.com/vapor/federation/service" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" - "github.com/vapor/wallet" ) type sidechainKeeper struct { cfg *config.Chain db *gorm.DB node *service.Node - chainName string assetStore *database.AssetStore + chainID uint64 } func NewSidechainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *sidechainKeeper { + chain := &orm.Chain{Name: common.VaporChainName} + if err := db.Where(chain).First(chain).Error; err != nil { + log.WithField("err", err).Fatal("fail on get chain info") + } + return &sidechainKeeper{ cfg: &cfg.Sidechain, db: db, node: service.NewNode(cfg.Sidechain.Upstream), - chainName: cfg.Sidechain.Name, assetStore: assetStore, + chainID: chain.ID, } } @@ -57,105 +60,109 @@ func (s *sidechainKeeper) Run() { } } -func (s *sidechainKeeper) syncBlock() (bool, error) { - chain := &orm.Chain{Name: s.chainName} - if err := s.db.Where(chain).First(chain).Error; err != nil { - return false, errors.Wrap(err, "query chain") - } - - height, err := s.node.GetBlockCount() - if err != nil { - return false, err - } +func (s *sidechainKeeper) createCrossChainReqs(db *gorm.DB, crossTransactionID uint64, tx *types.Tx, statusFail bool) error { + fromAddress := common.ProgToAddress(tx.Inputs[0].ControlProgram(), &consensus.MainNetParams) + for i, rawOutput := range tx.Outputs { + if rawOutput.OutputType() != types.CrossChainOutputType { + continue + } - if height <= chain.BlockHeight+s.cfg.Confirmations { - return false, nil - } + if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID { + continue + } - nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1) - if err != nil { - return false, err - } + asset, err := s.assetStore.GetByAssetID(rawOutput.OutputCommitment().AssetAmount.AssetId.String()) + if err != nil { + return err + } - nextBlock := &types.Block{} - if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { - return false, errors.New("Unmarshal nextBlock") - } + prog := rawOutput.ControlProgram() + req := &orm.CrossTransactionReq{ + CrossTransactionID: crossTransactionID, + SourcePos: uint64(i), + AssetID: asset.ID, + AssetAmount: rawOutput.OutputCommitment().AssetAmount.Amount, + Script: hex.EncodeToString(prog), + FromAddress: fromAddress, + ToAddress: common.ProgToAddress(prog, &consensus.BytomMainNetParams), + } - if nextBlock.PreviousBlockHash.String() != chain.BlockHash { - log.WithFields(log.Fields{ - "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(), - "db block_hash": chain.BlockHash, - }).Fatal("BlockHash mismatch") - return false, ErrInconsistentDB + if err := db.Create(req).Error; err != nil { + return err + } } + return nil +} - if err := s.tryAttachBlock(chain, nextBlock, txStatus); err != nil { - return false, err +func (s *sidechainKeeper) isDepositTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if input.InputType() == types.CrossChainInputType { + return true + } } - - return true, nil + return false } -func (s *sidechainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { - blockHash := block.Hash() - log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") - dbTx := s.db.Begin() - if err := s.processBlock(chain, block, txStatus); err != nil { - dbTx.Rollback() - return err +func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) bool { + for _, output := range tx.Outputs { + if output.OutputType() == types.CrossChainOutputType { + return true + } } - - return dbTx.Commit().Error + return false } -func (s *sidechainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { +func (s *sidechainKeeper) processBlock(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus) error { for i, tx := range block.Transactions { if s.isDepositTx(tx) { - if err := s.processDepositTx(chain, block, uint64(i), tx); err != nil { + if err := s.processDepositTx(db, block, i); err != nil { return err } } if s.isWithdrawalTx(tx) { - if err := s.processWithdrawalTx(chain, block, txStatus, uint64(i), tx); err != nil { + if err := s.processWithdrawalTx(db, block, txStatus, i); err != nil { return err } } } - - return s.processChainInfo(chain, block) + return nil } -func (s *sidechainKeeper) isDepositTx(tx *types.Tx) bool { - for _, input := range tx.Inputs { - if input.InputType() == types.CrossChainInputType { - return true - } +func (s *sidechainKeeper) processChainInfo(db *gorm.DB, block *types.Block) error { + blockHash := block.Hash() + res := db.Model(&orm.Chain{}).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(&orm.Chain{ + BlockHash: blockHash.String(), + BlockHeight: block.Height, + }) + if err := res.Error; err != nil { + return err } - return false -} -func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) bool { - for _, output := range tx.Outputs { - if output.OutputType() == types.CrossChainOutputType { - return true - } + if res.RowsAffected != 1 { + return ErrInconsistentDB } - return false + + return nil } -func (s *sidechainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error { +func (s *sidechainKeeper) processDepositTx(db *gorm.DB, block *types.Block, txIndex int) error { + tx := block.Transactions[txIndex] + sourceTxHash, err := s.locateMainChainTx(tx.Inputs[0]) + if err != nil { + return err + } + blockHash := block.Hash() - stmt := s.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID). - Where(&orm.CrossTransaction{ - DestTxHash: sql.NullString{tx.ID.String(), true}, - Status: common.CrossTxPendingStatus, - }).UpdateColumn(&orm.CrossTransaction{ + stmt := db.Model(&orm.CrossTransaction{}).Where(&orm.CrossTransaction{ + SourceTxHash: sourceTxHash, + Status: common.CrossTxPendingStatus, + }).UpdateColumn(&orm.CrossTransaction{ DestBlockHeight: sql.NullInt64{int64(block.Height), true}, - DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true}, + DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp / 1000), true}, DestBlockHash: sql.NullString{blockHash.String(), true}, DestTxIndex: sql.NullInt64{int64(txIndex), true}, + DestTxHash: sql.NullString{tx.ID.String(), true}, Status: common.CrossTxCompletedStatus, }) if stmt.Error != nil { @@ -163,23 +170,23 @@ func (s *sidechainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, } if stmt.RowsAffected != 1 { - log.Warnf("sidechainKeeper.processDepositTx(%v): rows affected != 1", tx.ID.String()) + return errors.Wrap(ErrInconsistentDB, "fail on find deposit data on database") } return nil } -func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error { - blockHash := block.Hash() - +func (s *sidechainKeeper) processWithdrawalTx(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus, txIndex int) error { + tx := block.Transactions[txIndex] var muxID bc.Hash - res0ID := tx.ResultIds[0] - switch res := tx.Entries[*res0ID].(type) { + switch res := tx.Entries[*tx.ResultIds[0]].(type) { case *bc.CrossChainOutput: muxID = *res.Source.Ref case *bc.IntraChainOutput: muxID = *res.Source.Ref case *bc.VoteOutput: muxID = *res.Source.Ref + case *bc.Retirement: + muxID = *res.Source.Ref default: return ErrOutputType } @@ -189,12 +196,13 @@ func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Blo return err } + blockHash := block.Hash() ormTx := &orm.CrossTransaction{ - ChainID: chain.ID, + ChainID: s.chainID, SourceBlockHeight: block.Height, - SourceBlockTimestamp: block.Timestamp, + SourceBlockTimestamp: block.Timestamp / 1000, SourceBlockHash: blockHash.String(), - SourceTxIndex: txIndex, + SourceTxIndex: uint64(txIndex), SourceMuxID: muxID.String(), SourceTxHash: tx.ID.String(), SourceRawTransaction: string(rawTx), @@ -205,94 +213,78 @@ func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Blo DestTxHash: sql.NullString{Valid: false}, Status: common.CrossTxPendingStatus, } - if err := s.db.Create(ormTx).Error; err != nil { + if err := db.Create(ormTx).Error; err != nil { return errors.Wrap(err, fmt.Sprintf("create sidechain WithdrawalTx %s", tx.ID.String())) } - statusFail := txStatus.VerifyStatus[txIndex].StatusFail - crossChainOutputs, err := s.getCrossChainReqs(ormTx.ID, tx, statusFail) - if err != nil { - return err - } + return s.createCrossChainReqs(db, ormTx.ID, tx, txStatus.VerifyStatus[txIndex].StatusFail) +} - for _, output := range crossChainOutputs { - if err := s.db.Create(output).Error; err != nil { - return errors.Wrap(err, fmt.Sprintf("create WithdrawalFromSidechain output: txid(%s), pos(%d)", tx.ID.String(), output.SourcePos)) - } +func (s *sidechainKeeper) locateMainChainTx(input *types.TxInput) (string, error) { + if input.InputType() != types.CrossChainInputType { + return "", errors.New("found weird crossChain tx") } - return nil + crossIn := input.TypedInput.(*types.CrossChainInput) + crossTx := &orm.CrossTransaction{SourceMuxID: crossIn.SpendCommitment.SourceID.String()} + if err := s.db.Where(crossTx).First(crossTx).Error; err != nil { + return "", errors.Wrap(err, "fail on find CrossTransaction") + } + return crossTx.SourceTxHash, nil } -func (s *sidechainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) { - var fromAddress string - inputCP := tx.Inputs[0].ControlProgram() - switch { - case segwit.IsP2WPKHScript(inputCP): - if pubHash, err := segwit.GetHashFromStandardProg(inputCP); err == nil { - fromAddress = wallet.BuildP2PKHAddress(pubHash, &consensus.VaporNetParams) - } - case segwit.IsP2WSHScript(inputCP): - if scriptHash, err := segwit.GetHashFromStandardProg(inputCP); err == nil { - fromAddress = wallet.BuildP2SHAddress(scriptHash, &consensus.VaporNetParams) - } +func (s *sidechainKeeper) syncBlock() (bool, error) { + chain := &orm.Chain{ID: s.chainID} + if err := s.db.First(chain).Error; err != nil { + return false, errors.Wrap(err, "query chain") } - reqs := []*orm.CrossTransactionReq{} - for i, rawOutput := range tx.Outputs { - // check valid withdrawal - if rawOutput.OutputType() != types.CrossChainOutputType { - continue - } + height, err := s.node.GetBlockCount() + if err != nil { + return false, err + } - if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID { - continue - } + if height <= chain.BlockHeight+s.cfg.Confirmations { + return false, nil + } - asset, err := s.assetStore.GetByAssetID(rawOutput.OutputCommitment().AssetAmount.AssetId.String()) - if err != nil { - return nil, err - } + nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1) + if err != nil { + return false, err + } - var toAddress string - outputCP := rawOutput.ControlProgram() - switch { - case segwit.IsP2WPKHScript(outputCP): - if pubHash, err := segwit.GetHashFromStandardProg(outputCP); err == nil { - toAddress = wallet.BuildP2PKHAddress(pubHash, &consensus.MainNetParams) - } - case segwit.IsP2WSHScript(outputCP): - if scriptHash, err := segwit.GetHashFromStandardProg(outputCP); err == nil { - toAddress = wallet.BuildP2SHAddress(scriptHash, &consensus.MainNetParams) - } - } + nextBlock := &types.Block{} + if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { + return false, errors.New("Unmarshal nextBlock") + } - req := &orm.CrossTransactionReq{ - CrossTransactionID: crossTransactionID, - SourcePos: uint64(i), - AssetID: asset.ID, - AssetAmount: rawOutput.OutputCommitment().AssetAmount.Amount, - Script: hex.EncodeToString(rawOutput.ControlProgram()), - FromAddress: fromAddress, - ToAddress: toAddress, - } - reqs = append(reqs, req) + if nextBlock.PreviousBlockHash.String() != chain.BlockHash { + log.WithFields(log.Fields{ + "remote previous_block_Hash": nextBlock.PreviousBlockHash.String(), + "db block_hash": chain.BlockHash, + }).Fatal("fail on block hash mismatch") + } + + if err := s.tryAttachBlock(nextBlock, txStatus); err != nil { + return false, err } - return reqs, nil + + return true, nil } -func (s *sidechainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error { +func (s *sidechainKeeper) tryAttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error { blockHash := block.Hash() - chain.BlockHash = blockHash.String() - chain.BlockHeight = block.Height - res := s.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain) - if err := res.Error; err != nil { + log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") + + dbTx := s.db.Begin() + if err := s.processBlock(dbTx, block, txStatus); err != nil { + dbTx.Rollback() return err } - if res.RowsAffected != 1 { - return ErrInconsistentDB + if err := s.processChainInfo(dbTx, block); err != nil { + dbTx.Rollback() + return err } - - return nil + return dbTx.Commit().Error }