From b6fc1cb4e358b34fc57e630d3d491590daf59a5f Mon Sep 17 00:00:00 2001 From: Paladz Date: Sat, 13 Jul 2019 14:30:27 +0800 Subject: [PATCH] Edit (#275) * tmp save * try to fix tx locate issue * elegant the code * elegant the code * edit for code review --- federation/common/const.go | 3 + federation/common/util.go | 26 +++ federation/config/config.go | 18 +- federation/database/asset_store.go | 10 - federation/synchron/mainchain_keeper.go | 347 ++++++++++++++++---------------- federation/synchron/sidechain_keeper.go | 279 +++++++++++++------------ federation/util/script.go | 45 ----- protocol/bc/types/txinput.go | 3 + 8 files changed, 348 insertions(+), 383 deletions(-) create mode 100644 federation/common/util.go delete mode 100644 federation/util/script.go diff --git a/federation/common/const.go b/federation/common/const.go index 56f7d0b3..7157d96d 100644 --- a/federation/common/const.go +++ b/federation/common/const.go @@ -7,6 +7,9 @@ const ( ) const ( + BytomChainName = "btm" + VaporChainName = "vapor" + CrossTxPendingStatusLabel = "pending" CrossTxCompletedStatusLabel = "completed" ) diff --git a/federation/common/util.go b/federation/common/util.go new file mode 100644 index 00000000..c02ed3db --- /dev/null +++ b/federation/common/util.go @@ -0,0 +1,26 @@ +package common + +import ( + "encoding/hex" + + log "github.com/sirupsen/logrus" + + "github.com/vapor/consensus" + "github.com/vapor/consensus/segwit" + "github.com/vapor/wallet" +) + +func ProgToAddress(prog []byte, netParams *consensus.Params) string { + hash, err := segwit.GetHashFromStandardProg(prog) + if err != nil { + log.WithFields(log.Fields{"prog": hex.EncodeToString(prog), "err": err}).Warn("fail on GetHashFromStandardProg") + return "" + } + + if segwit.IsP2WPKHScript(prog) { + return wallet.BuildP2PKHAddress(hash, netParams) + } else if segwit.IsP2WSHScript(prog) { + return wallet.BuildP2SHAddress(hash, netParams) + } + return "" +} diff --git a/federation/config/config.go b/federation/config/config.go index 52568dc2..d05082b4 100644 --- a/federation/config/config.go +++ b/federation/config/config.go @@ -6,7 +6,7 @@ import ( log "github.com/sirupsen/logrus" - "github.com/vapor/crypto/ed25519/chainkd" + vaporJson "github.com/vapor/encoding/json" ) func NewConfig() *Config { @@ -33,12 +33,11 @@ func NewConfigWithPath(path string) *Config { } type Config struct { - API API `json:"api"` - MySQLConfig MySQLConfig `json:"mysql"` - Warders []Warder `json:"warders"` - Quorum int `json:"quorum"` - Mainchain Chain `json:"mainchain"` - Sidechain Chain `json:"sidechain"` + API API `json:"api"` + MySQLConfig MySQLConfig `json:"mysql"` + FederationProg vaporJson.HexBytes `json:"federation_prog"` + Mainchain Chain `json:"mainchain"` + Sidechain Chain `json:"sidechain"` } type API struct { @@ -58,11 +57,6 @@ type MySQLConnection struct { DbName string `json:"database"` } -type Warder struct { - Position uint8 `json:"position"` - XPub chainkd.XPub `json:"xpub"` -} - type Chain struct { Name string `json:"name"` Upstream string `json:"upstream"` diff --git a/federation/database/asset_store.go b/federation/database/asset_store.go index 6c32e954..127c6f9c 100644 --- a/federation/database/asset_store.go +++ b/federation/database/asset_store.go @@ -66,13 +66,3 @@ func (a *AssetStore) GetByAssetID(assetID string) (*orm.Asset, error) { a.cache.Add(fmtAssetIDKey(asset.AssetID), asset) return asset, nil } - -func (a *AssetStore) Add(asset *orm.Asset) error { - if err := a.db.Create(asset).Error; err != nil { - return err - } - - a.cache.Add(fmtOrmIDKey(asset.ID), asset) - a.cache.Add(fmtAssetIDKey(asset.AssetID), asset) - return nil -} diff --git a/federation/synchron/mainchain_keeper.go b/federation/synchron/mainchain_keeper.go index 0875fe3e..8a50f794 100644 --- a/federation/synchron/mainchain_keeper.go +++ b/federation/synchron/mainchain_keeper.go @@ -10,39 +10,42 @@ import ( btmConsensus "github.com/bytom/consensus" btmBc "github.com/bytom/protocol/bc" "github.com/bytom/protocol/bc/types" + "github.com/bytom/protocol/vm" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" - vaporConsensus "github.com/vapor/consensus" - "github.com/vapor/consensus/segwit" + "github.com/vapor/consensus" "github.com/vapor/errors" "github.com/vapor/federation/common" "github.com/vapor/federation/config" "github.com/vapor/federation/database" "github.com/vapor/federation/database/orm" "github.com/vapor/federation/service" - "github.com/vapor/federation/util" "github.com/vapor/protocol/bc" - "github.com/vapor/wallet" ) type mainchainKeeper struct { - cfg *config.Chain - db *gorm.DB - node *service.Node - chainName string - assetStore *database.AssetStore - fedProg []byte + cfg *config.Chain + db *gorm.DB + node *service.Node + assetStore *database.AssetStore + chainID uint64 + federationProg []byte } func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper { + chain := &orm.Chain{Name: common.BytomChainName} + if err := db.Where(chain).First(chain).Error; err != nil { + log.WithField("err", err).Fatal("fail on get chain info") + } + return &mainchainKeeper{ - cfg: &cfg.Mainchain, - db: db, - node: service.NewNode(cfg.Mainchain.Upstream), - chainName: cfg.Mainchain.Name, - assetStore: assetStore, - fedProg: util.ParseFedProg(cfg.Warders, cfg.Quorum), + cfg: &cfg.Mainchain, + db: db, + node: service.NewNode(cfg.Mainchain.Upstream), + assetStore: assetStore, + federationProg: cfg.FederationProg, + chainID: chain.ID, } } @@ -63,111 +66,124 @@ func (m *mainchainKeeper) Run() { } } -func (m *mainchainKeeper) syncBlock() (bool, error) { - chain := &orm.Chain{Name: m.chainName} - if err := m.db.Where(chain).First(chain).Error; err != nil { - return false, errors.Wrap(err, "query chain") - } +func (m *mainchainKeeper) createCrossChainReqs(db *gorm.DB, crossTransactionID uint64, tx *types.Tx, statusFail bool) error { + prog := tx.Inputs[0].ControlProgram() + fromAddress := common.ProgToAddress(prog, &consensus.BytomMainNetParams) + toAddress := common.ProgToAddress(prog, &consensus.MainNetParams) + for i, rawOutput := range tx.Outputs { + if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.federationProg) { + continue + } - height, err := m.node.GetBlockCount() - if err != nil { - return false, err - } + if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID { + continue + } - if height <= chain.BlockHeight+m.cfg.Confirmations { - return false, nil - } + asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String()) + if err != nil { + return err + } - nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1) - if err != nil { - return false, err - } + req := &orm.CrossTransactionReq{ + CrossTransactionID: crossTransactionID, + SourcePos: uint64(i), + AssetID: asset.ID, + AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount, + Script: hex.EncodeToString(prog), + FromAddress: fromAddress, + ToAddress: toAddress, + } - nextBlock := &types.Block{} - if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { - return false, errors.New("Unmarshal nextBlock") + if err := db.Create(req).Error; err != nil { + return err + } } + return nil +} - if nextBlock.PreviousBlockHash.String() != chain.BlockHash { - log.WithFields(log.Fields{ - "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(), - "db block_hash": chain.BlockHash, - }).Fatal("BlockHash mismatch") - return false, ErrInconsistentDB +func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if bytes.Equal(input.ControlProgram(), m.federationProg) { + return false + } } - if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil { - return false, err + for _, output := range tx.Outputs { + if bytes.Equal(output.OutputCommitment.ControlProgram, m.federationProg) { + return true + } } - - return true, nil + return false } -func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { - blockHash := block.Hash() - log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") - dbTx := m.db.Begin() - if err := m.processBlock(chain, block, txStatus); err != nil { - dbTx.Rollback() - return err +func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if !bytes.Equal(input.ControlProgram(), m.federationProg) { + return false + } } - return dbTx.Commit().Error + if sourceTxHash := locateSideChainTx(tx.Outputs[len(tx.Outputs)-1]); sourceTxHash == "" { + return false + } + return false } -func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { - if err := m.processIssuing(block.Transactions); err != nil { - return err +func locateSideChainTx(output *types.TxOutput) string { + insts, err := vm.ParseProgram(output.OutputCommitment.ControlProgram) + if err != nil { + return "" + } + + if len(insts) != 2 { + return "" } + return hex.EncodeToString(insts[1].Data) +} + +func (m *mainchainKeeper) processBlock(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus) error { for i, tx := range block.Transactions { + if err := m.processIssuance(tx); err != nil { + return err + } + if m.isDepositTx(tx) { - if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil { + if err := m.processDepositTx(db, block, txStatus, i); err != nil { return err } } if m.isWithdrawalTx(tx) { - if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil { + if err := m.processWithdrawalTx(db, block, i); err != nil { return err } } } - return m.processChainInfo(chain, block) + return nil } -func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool { - for _, input := range tx.Inputs { - if bytes.Equal(input.ControlProgram(), m.fedProg) { - return false - } - } - - for _, output := range tx.Outputs { - if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) { - return true - } +func (m *mainchainKeeper) processChainInfo(db *gorm.DB, block *types.Block) error { + blockHash := block.Hash() + res := db.Model(&orm.Chain{}).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(&orm.Chain{ + BlockHash: blockHash.String(), + BlockHeight: block.Height, + }) + if err := res.Error; err != nil { + return err } - return false -} - -func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool { - for _, input := range tx.Inputs { - if bytes.Equal(input.ControlProgram(), m.fedProg) { - return true - } + if res.RowsAffected != 1 { + return ErrInconsistentDB } - return false + return nil } -func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error { - blockHash := block.Hash() - +func (m *mainchainKeeper) processDepositTx(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus, txIndex int) error { + tx := block.Transactions[txIndex] var muxID btmBc.Hash - res0ID := tx.ResultIds[0] - switch res := tx.Entries[*res0ID].(type) { + switch res := tx.Entries[*tx.ResultIds[0]].(type) { case *btmBc.Output: muxID = *res.Source.Ref case *btmBc.Retirement: @@ -181,12 +197,13 @@ func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, return err } + blockHash := block.Hash() ormTx := &orm.CrossTransaction{ - ChainID: chain.ID, + ChainID: m.chainID, SourceBlockHeight: block.Height, SourceBlockTimestamp: block.Timestamp, SourceBlockHash: blockHash.String(), - SourceTxIndex: txIndex, + SourceTxIndex: uint64(txIndex), SourceMuxID: muxID.String(), SourceTxHash: tx.ID.String(), SourceRawTransaction: string(rawTx), @@ -197,84 +214,52 @@ func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, DestTxHash: sql.NullString{Valid: false}, Status: common.CrossTxPendingStatus, } - if err := m.db.Create(ormTx).Error; err != nil { + if err := db.Create(ormTx).Error; err != nil { return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String())) } - statusFail := txStatus.VerifyStatus[txIndex].StatusFail - crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail) - if err != nil { - return err - } - - for _, input := range crossChainInputs { - if err := m.db.Create(input).Error; err != nil { - return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos)) - } - } - - return nil + return m.createCrossChainReqs(db, ormTx.ID, tx, txStatus.VerifyStatus[txIndex].StatusFail) } -func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) { - var fromAddress, toAddress string - // assume inputs are from an identical owner - prog := tx.Inputs[0].ControlProgram() - script := hex.EncodeToString(prog) - switch { - case segwit.IsP2WPKHScript(prog): - if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil { - fromAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.BytomMainNetParams) - toAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.MainNetParams) - } - case segwit.IsP2WSHScript(prog): - if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil { - fromAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.BytomMainNetParams) - toAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.MainNetParams) - } - } - - reqs := []*orm.CrossTransactionReq{} - for i, rawOutput := range tx.Outputs { - // check valid deposit - if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) { +func (m *mainchainKeeper) processIssuance(tx *types.Tx) error { + for _, input := range tx.Inputs { + if input.InputType() != types.IssuanceInputType { continue } - if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID { + issuance := input.TypedInput.(*types.IssuanceInput) + assetID := issuance.AssetID() + if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil { continue } - asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String()) - if err != nil { - return nil, err + asset := &orm.Asset{ + AssetID: assetID.String(), + IssuanceProgram: hex.EncodeToString(issuance.IssuanceProgram), + VMVersion: issuance.VMVersion, + Definition: string(issuance.AssetDefinition), } - req := &orm.CrossTransactionReq{ - CrossTransactionID: crossTransactionID, - SourcePos: uint64(i), - AssetID: asset.ID, - AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount, - Script: script, - FromAddress: fromAddress, - ToAddress: toAddress, + if err := m.db.Create(asset).Error; err != nil { + return err } - reqs = append(reqs, req) } - return reqs, nil + return nil } -func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error { +func (m *mainchainKeeper) processWithdrawalTx(db *gorm.DB, block *types.Block, txIndex int) error { blockHash := block.Hash() - stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID). - Where(&orm.CrossTransaction{ - DestTxHash: sql.NullString{tx.ID.String(), true}, - Status: common.CrossTxPendingStatus, - }).UpdateColumn(&orm.CrossTransaction{ + tx := block.Transactions[txIndex] + + stmt := db.Model(&orm.CrossTransaction{}).Where(&orm.CrossTransaction{ + SourceTxHash: locateSideChainTx(tx.Outputs[len(tx.Outputs)-1]), + Status: common.CrossTxPendingStatus, + }).UpdateColumn(&orm.CrossTransaction{ DestBlockHeight: sql.NullInt64{int64(block.Height), true}, DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true}, DestBlockHash: sql.NullString{blockHash.String(), true}, DestTxIndex: sql.NullInt64{int64(txIndex), true}, + DestTxHash: sql.NullString{tx.ID.String(), true}, Status: common.CrossTxCompletedStatus, }) if stmt.Error != nil { @@ -282,46 +267,64 @@ func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Blo } if stmt.RowsAffected != 1 { - log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String()) + return ErrInconsistentDB } return nil } -func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error { - blockHash := block.Hash() - chain.BlockHash = blockHash.String() - chain.BlockHeight = block.Height - res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain) - if err := res.Error; err != nil { - return err +func (m *mainchainKeeper) syncBlock() (bool, error) { + chain := &orm.Chain{ID: m.chainID} + if err := m.db.First(chain).Error; err != nil { + return false, errors.Wrap(err, "query chain") } - if res.RowsAffected != 1 { - return ErrInconsistentDB + height, err := m.node.GetBlockCount() + if err != nil { + return false, err } - return nil + if height <= chain.BlockHeight+m.cfg.Confirmations { + return false, nil + } + + nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1) + if err != nil { + return false, err + } + + nextBlock := &types.Block{} + if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { + return false, errors.New("Unmarshal nextBlock") + } + + if nextBlock.PreviousBlockHash.String() != chain.BlockHash { + log.WithFields(log.Fields{ + "remote previous_block_Hash": nextBlock.PreviousBlockHash.String(), + "db block_hash": chain.BlockHash, + }).Fatal("fail on block hash mismatch") + } + + if err := m.tryAttachBlock(nextBlock, txStatus); err != nil { + return false, err + } + + return true, nil } -func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error { - for _, tx := range txs { - for _, input := range tx.Inputs { - switch inp := input.TypedInput.(type) { - case *types.IssuanceInput: - assetID := inp.AssetID() - if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil { - continue - } - - m.assetStore.Add(&orm.Asset{ - AssetID: assetID.String(), - IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram), - VMVersion: inp.VMVersion, - Definition: string(inp.AssetDefinition), - }) - } - } +func (m *mainchainKeeper) tryAttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error { + blockHash := block.Hash() + log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") + + dbTx := m.db.Begin() + if err := m.processBlock(dbTx, block, txStatus); err != nil { + dbTx.Rollback() + return err } - return nil + if err := m.processChainInfo(dbTx, block); err != nil { + dbTx.Rollback() + return err + } + + return dbTx.Commit().Error } diff --git a/federation/synchron/sidechain_keeper.go b/federation/synchron/sidechain_keeper.go index 89a3bd99..d7f2acc4 100644 --- a/federation/synchron/sidechain_keeper.go +++ b/federation/synchron/sidechain_keeper.go @@ -10,7 +10,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/vapor/consensus" - "github.com/vapor/consensus/segwit" "github.com/vapor/errors" "github.com/vapor/federation/common" "github.com/vapor/federation/config" @@ -19,23 +18,26 @@ import ( "github.com/vapor/federation/service" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" - "github.com/vapor/wallet" ) type sidechainKeeper struct { cfg *config.Chain db *gorm.DB node *service.Node - chainName string assetStore *database.AssetStore + chainID uint64 } func NewSidechainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *sidechainKeeper { + chain := &orm.Chain{Name: common.VaporChainName} + if err := db.Where(chain).First(chain).Error; err != nil { + log.WithField("err", err).Fatal("fail on get chain info") + } + return &sidechainKeeper{ cfg: &cfg.Sidechain, db: db, node: service.NewNode(cfg.Sidechain.Upstream), - chainName: cfg.Sidechain.Name, assetStore: assetStore, } } @@ -57,105 +59,109 @@ func (s *sidechainKeeper) Run() { } } -func (s *sidechainKeeper) syncBlock() (bool, error) { - chain := &orm.Chain{Name: s.chainName} - if err := s.db.Where(chain).First(chain).Error; err != nil { - return false, errors.Wrap(err, "query chain") - } - - height, err := s.node.GetBlockCount() - if err != nil { - return false, err - } +func (s *sidechainKeeper) createCrossChainReqs(db *gorm.DB, crossTransactionID uint64, tx *types.Tx, statusFail bool) error { + fromAddress := common.ProgToAddress(tx.Inputs[0].ControlProgram(), &consensus.MainNetParams) + for i, rawOutput := range tx.Outputs { + if rawOutput.OutputType() != types.CrossChainOutputType { + continue + } - if height <= chain.BlockHeight+s.cfg.Confirmations { - return false, nil - } + if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID { + continue + } - nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1) - if err != nil { - return false, err - } + asset, err := s.assetStore.GetByAssetID(rawOutput.OutputCommitment().AssetAmount.AssetId.String()) + if err != nil { + return err + } - nextBlock := &types.Block{} - if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { - return false, errors.New("Unmarshal nextBlock") - } + prog := rawOutput.ControlProgram() + req := &orm.CrossTransactionReq{ + CrossTransactionID: crossTransactionID, + SourcePos: uint64(i), + AssetID: asset.ID, + AssetAmount: rawOutput.OutputCommitment().AssetAmount.Amount, + Script: hex.EncodeToString(prog), + FromAddress: fromAddress, + ToAddress: common.ProgToAddress(prog, &consensus.BytomMainNetParams), + } - if nextBlock.PreviousBlockHash.String() != chain.BlockHash { - log.WithFields(log.Fields{ - "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(), - "db block_hash": chain.BlockHash, - }).Fatal("BlockHash mismatch") - return false, ErrInconsistentDB + if err := db.Create(req).Error; err != nil { + return err + } } + return nil +} - if err := s.tryAttachBlock(chain, nextBlock, txStatus); err != nil { - return false, err +func (s *sidechainKeeper) isDepositTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if input.InputType() == types.CrossChainInputType { + return true + } } - - return true, nil + return false } -func (s *sidechainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { - blockHash := block.Hash() - log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") - dbTx := s.db.Begin() - if err := s.processBlock(chain, block, txStatus); err != nil { - dbTx.Rollback() - return err +func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) bool { + for _, output := range tx.Outputs { + if output.OutputType() == types.CrossChainOutputType { + return true + } } - - return dbTx.Commit().Error + return false } -func (s *sidechainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { +func (s *sidechainKeeper) processBlock(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus) error { for i, tx := range block.Transactions { if s.isDepositTx(tx) { - if err := s.processDepositTx(chain, block, uint64(i), tx); err != nil { + if err := s.processDepositTx(db, block, i); err != nil { return err } } if s.isWithdrawalTx(tx) { - if err := s.processWithdrawalTx(chain, block, txStatus, uint64(i), tx); err != nil { + if err := s.processWithdrawalTx(db, block, txStatus, i); err != nil { return err } } } - - return s.processChainInfo(chain, block) + return nil } -func (s *sidechainKeeper) isDepositTx(tx *types.Tx) bool { - for _, input := range tx.Inputs { - if input.InputType() == types.CrossChainInputType { - return true - } +func (s *sidechainKeeper) processChainInfo(db *gorm.DB, block *types.Block) error { + blockHash := block.Hash() + res := db.Model(&orm.Chain{}).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(&orm.Chain{ + BlockHash: blockHash.String(), + BlockHeight: block.Height, + }) + if err := res.Error; err != nil { + return err } - return false -} -func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) bool { - for _, output := range tx.Outputs { - if output.OutputType() == types.CrossChainOutputType { - return true - } + if res.RowsAffected != 1 { + return ErrInconsistentDB } - return false + + return nil } -func (s *sidechainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error { +func (s *sidechainKeeper) processDepositTx(db *gorm.DB, block *types.Block, txIndex int) error { + tx := block.Transactions[txIndex] + sourceTxHash, err := s.locateMainChainTx(tx.Inputs[0]) + if err != nil { + return err + } + blockHash := block.Hash() - stmt := s.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID). - Where(&orm.CrossTransaction{ - DestTxHash: sql.NullString{tx.ID.String(), true}, - Status: common.CrossTxPendingStatus, - }).UpdateColumn(&orm.CrossTransaction{ + stmt := db.Model(&orm.CrossTransaction{}).Where(&orm.CrossTransaction{ + SourceTxHash: sourceTxHash, + Status: common.CrossTxPendingStatus, + }).UpdateColumn(&orm.CrossTransaction{ DestBlockHeight: sql.NullInt64{int64(block.Height), true}, DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true}, DestBlockHash: sql.NullString{blockHash.String(), true}, DestTxIndex: sql.NullInt64{int64(txIndex), true}, + DestTxHash: sql.NullString{tx.ID.String(), true}, Status: common.CrossTxCompletedStatus, }) if stmt.Error != nil { @@ -163,23 +169,23 @@ func (s *sidechainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, } if stmt.RowsAffected != 1 { - log.Warnf("sidechainKeeper.processDepositTx(%v): rows affected != 1", tx.ID.String()) + return errors.Wrap(ErrInconsistentDB, "fail on find deposit data on database") } return nil } -func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error { - blockHash := block.Hash() - +func (s *sidechainKeeper) processWithdrawalTx(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus, txIndex int) error { + tx := block.Transactions[txIndex] var muxID bc.Hash - res0ID := tx.ResultIds[0] - switch res := tx.Entries[*res0ID].(type) { + switch res := tx.Entries[*tx.ResultIds[0]].(type) { case *bc.CrossChainOutput: muxID = *res.Source.Ref case *bc.IntraChainOutput: muxID = *res.Source.Ref case *bc.VoteOutput: muxID = *res.Source.Ref + case *bc.Retirement: + muxID = *res.Source.Ref default: return ErrOutputType } @@ -189,12 +195,13 @@ func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Blo return err } + blockHash := block.Hash() ormTx := &orm.CrossTransaction{ - ChainID: chain.ID, + ChainID: s.chainID, SourceBlockHeight: block.Height, SourceBlockTimestamp: block.Timestamp, SourceBlockHash: blockHash.String(), - SourceTxIndex: txIndex, + SourceTxIndex: uint64(txIndex), SourceMuxID: muxID.String(), SourceTxHash: tx.ID.String(), SourceRawTransaction: string(rawTx), @@ -205,94 +212,78 @@ func (s *sidechainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Blo DestTxHash: sql.NullString{Valid: false}, Status: common.CrossTxPendingStatus, } - if err := s.db.Create(ormTx).Error; err != nil { + if err := db.Create(ormTx).Error; err != nil { return errors.Wrap(err, fmt.Sprintf("create sidechain WithdrawalTx %s", tx.ID.String())) } - statusFail := txStatus.VerifyStatus[txIndex].StatusFail - crossChainOutputs, err := s.getCrossChainReqs(ormTx.ID, tx, statusFail) - if err != nil { - return err - } + return s.createCrossChainReqs(db, ormTx.ID, tx, txStatus.VerifyStatus[txIndex].StatusFail) +} - for _, output := range crossChainOutputs { - if err := s.db.Create(output).Error; err != nil { - return errors.Wrap(err, fmt.Sprintf("create WithdrawalFromSidechain output: txid(%s), pos(%d)", tx.ID.String(), output.SourcePos)) - } +func (s *sidechainKeeper) locateMainChainTx(input *types.TxInput) (string, error) { + if input.InputType() != types.CrossChainInputType { + return "", errors.New("found weird crossChain tx") } - return nil + crossIn := input.TypedInput.(*types.CrossChainInput) + crossTx := &orm.CrossTransaction{SourceMuxID: crossIn.SpendCommitment.SourceID.String()} + if err := s.db.Where(crossTx).First(crossTx).Error; err != nil { + return "", errors.Wrap(err, "fail on find CrossTransaction") + } + return crossTx.SourceTxHash, nil } -func (s *sidechainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) { - var fromAddress string - inputCP := tx.Inputs[0].ControlProgram() - switch { - case segwit.IsP2WPKHScript(inputCP): - if pubHash, err := segwit.GetHashFromStandardProg(inputCP); err == nil { - fromAddress = wallet.BuildP2PKHAddress(pubHash, &consensus.MainNetParams) - } - case segwit.IsP2WSHScript(inputCP): - if scriptHash, err := segwit.GetHashFromStandardProg(inputCP); err == nil { - fromAddress = wallet.BuildP2SHAddress(scriptHash, &consensus.MainNetParams) - } +func (s *sidechainKeeper) syncBlock() (bool, error) { + chain := &orm.Chain{ID: s.chainID} + if err := s.db.First(chain).Error; err != nil { + return false, errors.Wrap(err, "query chain") } - reqs := []*orm.CrossTransactionReq{} - for i, rawOutput := range tx.Outputs { - // check valid withdrawal - if rawOutput.OutputType() != types.CrossChainOutputType { - continue - } + height, err := s.node.GetBlockCount() + if err != nil { + return false, err + } - if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID { - continue - } + if height <= chain.BlockHeight+s.cfg.Confirmations { + return false, nil + } - asset, err := s.assetStore.GetByAssetID(rawOutput.OutputCommitment().AssetAmount.AssetId.String()) - if err != nil { - return nil, err - } + nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1) + if err != nil { + return false, err + } - var toAddress string - outputCP := rawOutput.ControlProgram() - switch { - case segwit.IsP2WPKHScript(outputCP): - if pubHash, err := segwit.GetHashFromStandardProg(outputCP); err == nil { - toAddress = wallet.BuildP2PKHAddress(pubHash, &consensus.BytomMainNetParams) - } - case segwit.IsP2WSHScript(outputCP): - if scriptHash, err := segwit.GetHashFromStandardProg(outputCP); err == nil { - toAddress = wallet.BuildP2SHAddress(scriptHash, &consensus.BytomMainNetParams) - } - } + nextBlock := &types.Block{} + if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { + return false, errors.New("Unmarshal nextBlock") + } - req := &orm.CrossTransactionReq{ - CrossTransactionID: crossTransactionID, - SourcePos: uint64(i), - AssetID: asset.ID, - AssetAmount: rawOutput.OutputCommitment().AssetAmount.Amount, - Script: hex.EncodeToString(rawOutput.ControlProgram()), - FromAddress: fromAddress, - ToAddress: toAddress, - } - reqs = append(reqs, req) + if nextBlock.PreviousBlockHash.String() != chain.BlockHash { + log.WithFields(log.Fields{ + "remote previous_block_Hash": nextBlock.PreviousBlockHash.String(), + "db block_hash": chain.BlockHash, + }).Fatal("fail on block hash mismatch") + } + + if err := s.tryAttachBlock(nextBlock, txStatus); err != nil { + return false, err } - return reqs, nil + + return true, nil } -func (s *sidechainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error { +func (s *sidechainKeeper) tryAttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error { blockHash := block.Hash() - chain.BlockHash = blockHash.String() - chain.BlockHeight = block.Height - res := s.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain) - if err := res.Error; err != nil { + log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") + + dbTx := s.db.Begin() + if err := s.processBlock(dbTx, block, txStatus); err != nil { + dbTx.Rollback() return err } - if res.RowsAffected != 1 { - return ErrInconsistentDB + if err := s.processChainInfo(dbTx, block); err != nil { + dbTx.Rollback() + return err } - - return nil + return dbTx.Commit().Error } diff --git a/federation/util/script.go b/federation/util/script.go deleted file mode 100644 index 76d498ee..00000000 --- a/federation/util/script.go +++ /dev/null @@ -1,45 +0,0 @@ -package util - -import ( - "sort" - - log "github.com/sirupsen/logrus" - - "github.com/vapor/crypto" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/federation/config" - "github.com/vapor/protocol/vm/vmutil" -) - -func ParseFedProg(warders []config.Warder, quorum int) []byte { - SortWarders(warders) - - xpubs := []chainkd.XPub{} - for _, w := range warders { - xpubs = append(xpubs, w.XPub) - } - - scirpt, err := vmutil.P2SPMultiSigProgram(chainkd.XPubKeys(xpubs), quorum) - if err != nil { - log.Panicf("fail to generate federation scirpt for federation: %v", err) - } - - scriptHash := crypto.Sha256(scirpt) - wscript, err := vmutil.P2WSHProgram(scriptHash) - if err != nil { - log.Panicf("Fail converts scriptHash to witness: %v", err) - } - - return wscript -} - -type byPosition []config.Warder - -func (w byPosition) Len() int { return len(w) } -func (w byPosition) Swap(i, j int) { w[i], w[j] = w[j], w[i] } -func (w byPosition) Less(i, j int) bool { return w[i].Position < w[j].Position } - -func SortWarders(warders []config.Warder) []config.Warder { - sort.Sort(byPosition(warders)) - return warders -} diff --git a/protocol/bc/types/txinput.go b/protocol/bc/types/txinput.go index d1eeb57c..0d8402c3 100644 --- a/protocol/bc/types/txinput.go +++ b/protocol/bc/types/txinput.go @@ -134,6 +134,9 @@ func (t *TxInput) SpentOutputID() (o bc.Hash, err error) { case *VetoInput: o, err = ComputeOutputID(&inp.SpendCommitment, VetoInputType, inp.Vote) + + case *CrossChainInput: + o, err = ComputeOutputID(&inp.SpendCommitment, SpendInputType, nil) } return o, err -- 2.11.0