X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=federation%2Fsynchron%2Fmainchain_keeper.go;fp=federation%2Fsynchron%2Fmainchain_keeper.go;h=50f7a039759d2170e2d949cc701ef4e2d799f2ae;hb=2fb0c3fb659442efe02c14f4049a688d4bb29c40;hp=c53a90dcff57824dd2c4636a6f51c4d856872356;hpb=6a948fdbedc4f532080f631bfbd71b74b9510fa6;p=bytom%2Fvapor.git diff --git a/federation/synchron/mainchain_keeper.go b/federation/synchron/mainchain_keeper.go index c53a90dc..50f7a039 100644 --- a/federation/synchron/mainchain_keeper.go +++ b/federation/synchron/mainchain_keeper.go @@ -1,14 +1,21 @@ package synchron import ( + "bytes" + "database/sql" "encoding/hex" + "fmt" "time" - btmTypes "github.com/bytom/protocol/bc/types" + "github.com/bytom/consensus" + btmBc "github.com/bytom/protocol/bc" + "github.com/bytom/protocol/bc/types" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" + vaporCfg "github.com/vapor/config" "github.com/vapor/errors" + "github.com/vapor/federation/common" "github.com/vapor/federation/config" "github.com/vapor/federation/database" "github.com/vapor/federation/database/orm" @@ -16,6 +23,8 @@ import ( "github.com/vapor/protocol/bc" ) +var fedProg = vaporCfg.FederationProgrom(vaporCfg.CommonConfig) + type mainchainKeeper struct { cfg *config.Chain db *gorm.DB @@ -71,7 +80,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { return false, err } - nextBlock := &btmTypes.Block{} + nextBlock := &types.Block{} if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { return false, errors.New("Unmarshal nextBlock") } @@ -81,7 +90,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(), "db block_hash": chain.BlockHash, }).Fatal("BlockHash mismatch") - return false, errors.New("BlockHash mismatch") + return false, ErrInconsistentDB } if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil { @@ -91,11 +100,11 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { return true, nil } -func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error { +func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { blockHash := block.Hash() log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") m.db.Begin() - if err := m.processBlock(block); err != nil { + if err := m.processBlock(chain, block, txStatus); err != nil { m.db.Rollback() return err } @@ -103,19 +112,172 @@ func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block return m.db.Commit().Error } -func (m *mainchainKeeper) processBlock(block *btmTypes.Block) error { +func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { if err := m.processIssuing(block.Transactions); err != nil { return err } + for i, tx := range block.Transactions { + if m.isDepositTx(tx) { + if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil { + return err + } + } + + if m.isWithdrawalTx(tx) { + if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil { + return err + } + } + } + + return m.processChainInfo(chain, block) +} + +func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool { + for _, output := range tx.Outputs { + if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) { + return true + } + } + return false +} + +func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if bytes.Equal(input.ControlProgram(), fedProg) { + return true + } + } + return false +} + +func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error { + blockHash := block.Hash() + + var muxID btmBc.Hash + res0ID := tx.ResultIds[0] + switch res := tx.Entries[*res0ID].(type) { + case *btmBc.Output: + muxID = *res.Source.Ref + case *btmBc.Retirement: + muxID = *res.Source.Ref + default: + return ErrOutputType + } + + rawTx, err := tx.MarshalText() + if err != nil { + return err + } + + ormTx := &orm.CrossTransaction{ + ChainID: chain.ID, + SourceBlockHeight: block.Height, + SourceBlockHash: blockHash.String(), + SourceTxIndex: txIndex, + SourceMuxID: muxID.String(), + SourceTxHash: tx.ID.String(), + SourceRawTransaction: string(rawTx), + DestBlockHeight: sql.NullInt64{Valid: false}, + DestBlockHash: sql.NullString{Valid: false}, + DestTxIndex: sql.NullInt64{Valid: false}, + DestTxHash: sql.NullString{Valid: false}, + Status: common.CrossTxPendingStatus, + } + if err := m.db.Create(ormTx).Error; err != nil { + return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String())) + } + + statusFail := txStatus.VerifyStatus[txIndex].StatusFail + crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail) + if err != nil { + return err + } + + for _, input := range crossChainInputs { + if err := m.db.Create(input).Error; err != nil { + return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos)) + } + } + + return nil +} + +func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) { + // assume inputs are from an identical owner + script := hex.EncodeToString(tx.Inputs[0].ControlProgram()) + inputs := []*orm.CrossTransactionReq{} + for i, rawOutput := range tx.Outputs { + // check valid deposit + if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, fedProg) { + continue + } + + if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *consensus.BTMAssetID { + continue + } + + asset, err := m.getAsset(rawOutput.OutputCommitment.AssetAmount.AssetId.String()) + if err != nil { + return nil, err + } + + input := &orm.CrossTransactionReq{ + CrossTransactionID: crossTransactionID, + SourcePos: uint64(i), + AssetID: asset.ID, + AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount, + Script: script, + } + inputs = append(inputs, input) + } + return inputs, nil +} + +func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error { + blockHash := block.Hash() + stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID). + Where(&orm.CrossTransaction{ + DestTxHash: sql.NullString{tx.ID.String(), true}, + Status: common.CrossTxSubmittedStatus, + }).UpdateColumn(&orm.CrossTransaction{ + DestBlockHeight: sql.NullInt64{int64(block.Height), true}, + DestBlockHash: sql.NullString{blockHash.String(), true}, + DestTxIndex: sql.NullInt64{int64(txIndex), true}, + Status: common.CrossTxCompletedStatus, + }) + if stmt.Error != nil { + return stmt.Error + } + + if stmt.RowsAffected != 1 { + log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String()) + } + return nil +} + +func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error { + blockHash := block.Hash() + chain.BlockHash = blockHash.String() + chain.BlockHeight = block.Height + res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain) + if err := res.Error; err != nil { + return err + } + + if res.RowsAffected != 1 { + return ErrInconsistentDB + } + return nil } -func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error { +func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error { for _, tx := range txs { for _, input := range tx.Inputs { switch inp := input.TypedInput.(type) { - case *btmTypes.IssuanceInput: + case *types.IssuanceInput: assetID := inp.AssetID() if _, err := m.getAsset(assetID.String()); err == nil { continue