X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=federation%2Fsynchron%2Fmainchain_keeper.go;h=3b4da044be1ee292180b99f98f07505ed311ced7;hp=c53a90dcff57824dd2c4636a6f51c4d856872356;hb=a177b8b4f2828248c5bf34561b877c2578b77dd1;hpb=8da5d90ecadf13f1510dbbc4bce05645684142a2 diff --git a/federation/synchron/mainchain_keeper.go b/federation/synchron/mainchain_keeper.go index c53a90dc..3b4da044 100644 --- a/federation/synchron/mainchain_keeper.go +++ b/federation/synchron/mainchain_keeper.go @@ -1,19 +1,29 @@ package synchron import ( + "bytes" + "database/sql" "encoding/hex" + "fmt" "time" - btmTypes "github.com/bytom/protocol/bc/types" + btmConsensus "github.com/bytom/consensus" + btmBc "github.com/bytom/protocol/bc" + "github.com/bytom/protocol/bc/types" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" + vaporConsensus "github.com/vapor/consensus" + "github.com/vapor/consensus/segwit" "github.com/vapor/errors" + "github.com/vapor/federation/common" "github.com/vapor/federation/config" "github.com/vapor/federation/database" "github.com/vapor/federation/database/orm" "github.com/vapor/federation/service" + "github.com/vapor/federation/util" "github.com/vapor/protocol/bc" + "github.com/vapor/wallet" ) type mainchainKeeper struct { @@ -21,16 +31,18 @@ type mainchainKeeper struct { db *gorm.DB node *service.Node chainName string - assetCache *database.AssetCache + assetStore *database.AssetStore + fedProg []byte } -func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper { +func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper { return &mainchainKeeper{ - cfg: chainCfg, + cfg: &cfg.Mainchain, db: db, - node: service.NewNode(chainCfg.Upstream), - chainName: chainCfg.Name, - assetCache: database.NewAssetCache(), + node: service.NewNode(cfg.Mainchain.Upstream), + chainName: cfg.Mainchain.Name, + assetStore: assetStore, + fedProg: util.SegWitWrap(util.ParseFedProg(cfg.Warders, cfg.Quorum)), } } @@ -71,7 +83,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { return false, err } - nextBlock := &btmTypes.Block{} + nextBlock := &types.Block{} if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil { return false, errors.New("Unmarshal nextBlock") } @@ -81,7 +93,7 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(), "db block_hash": chain.BlockHash, }).Fatal("BlockHash mismatch") - return false, errors.New("BlockHash mismatch") + return false, ErrInconsistentDB } if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil { @@ -91,64 +103,218 @@ func (m *mainchainKeeper) syncBlock() (bool, error) { return true, nil } -func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error { +func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { blockHash := block.Hash() log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock") - m.db.Begin() - if err := m.processBlock(block); err != nil { - m.db.Rollback() + dbTx := m.db.Begin() + if err := m.processBlock(chain, block, txStatus); err != nil { + dbTx.Rollback() return err } - return m.db.Commit().Error + return dbTx.Commit().Error } -func (m *mainchainKeeper) processBlock(block *btmTypes.Block) error { +func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error { if err := m.processIssuing(block.Transactions); err != nil { return err } + for i, tx := range block.Transactions { + if m.isDepositTx(tx) { + if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil { + return err + } + } + + if m.isWithdrawalTx(tx) { + if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil { + return err + } + } + } + + return m.processChainInfo(chain, block) +} + +func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool { + for _, output := range tx.Outputs { + if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) { + return true + } + } + return false +} + +func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool { + for _, input := range tx.Inputs { + if bytes.Equal(input.ControlProgram(), m.fedProg) { + return true + } + } + return false +} + +func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error { + blockHash := block.Hash() + + var muxID btmBc.Hash + res0ID := tx.ResultIds[0] + switch res := tx.Entries[*res0ID].(type) { + case *btmBc.Output: + muxID = *res.Source.Ref + case *btmBc.Retirement: + muxID = *res.Source.Ref + default: + return ErrOutputType + } + + rawTx, err := tx.MarshalText() + if err != nil { + return err + } + + ormTx := &orm.CrossTransaction{ + ChainID: chain.ID, + SourceBlockHeight: block.Height, + SourceBlockTimestamp: block.Timestamp, + SourceBlockHash: blockHash.String(), + SourceTxIndex: txIndex, + SourceMuxID: muxID.String(), + SourceTxHash: tx.ID.String(), + SourceRawTransaction: string(rawTx), + DestBlockHeight: sql.NullInt64{Valid: false}, + DestBlockTimestamp: sql.NullInt64{Valid: false}, + DestBlockHash: sql.NullString{Valid: false}, + DestTxIndex: sql.NullInt64{Valid: false}, + DestTxHash: sql.NullString{Valid: false}, + Status: common.CrossTxPendingStatus, + } + if err := m.db.Create(ormTx).Error; err != nil { + return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String())) + } + + statusFail := txStatus.VerifyStatus[txIndex].StatusFail + crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail) + if err != nil { + return err + } + + for _, input := range crossChainInputs { + if err := m.db.Create(input).Error; err != nil { + return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos)) + } + } + + return nil +} + +func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) { + var fromAddress, toAddress string + // assume inputs are from an identical owner + prog := tx.Inputs[0].ControlProgram() + script := hex.EncodeToString(prog) + switch { + case segwit.IsP2WPKHScript(prog): + if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil { + fromAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.MainNetParams) + toAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.VaporNetParams) + } + case segwit.IsP2WSHScript(prog): + if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil { + fromAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.MainNetParams) + toAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.VaporNetParams) + } + } + + reqs := []*orm.CrossTransactionReq{} + for i, rawOutput := range tx.Outputs { + // check valid deposit + if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) { + continue + } + + if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID { + continue + } + + asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String()) + if err != nil { + return nil, err + } + + req := &orm.CrossTransactionReq{ + CrossTransactionID: crossTransactionID, + SourcePos: uint64(i), + AssetID: asset.ID, + AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount, + Script: script, + FromAddress: fromAddress, + ToAddress: toAddress, + } + reqs = append(reqs, req) + } + return reqs, nil +} + +func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error { + blockHash := block.Hash() + stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID). + Where(&orm.CrossTransaction{ + DestTxHash: sql.NullString{tx.ID.String(), true}, + Status: common.CrossTxPendingStatus, + }).UpdateColumn(&orm.CrossTransaction{ + DestBlockHeight: sql.NullInt64{int64(block.Height), true}, + DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true}, + DestBlockHash: sql.NullString{blockHash.String(), true}, + DestTxIndex: sql.NullInt64{int64(txIndex), true}, + Status: common.CrossTxCompletedStatus, + }) + if stmt.Error != nil { + return stmt.Error + } + + if stmt.RowsAffected != 1 { + log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String()) + } return nil } -func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error { +func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error { + blockHash := block.Hash() + chain.BlockHash = blockHash.String() + chain.BlockHeight = block.Height + res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain) + if err := res.Error; err != nil { + return err + } + + if res.RowsAffected != 1 { + return ErrInconsistentDB + } + + return nil +} + +func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error { for _, tx := range txs { for _, input := range tx.Inputs { switch inp := input.TypedInput.(type) { - case *btmTypes.IssuanceInput: + case *types.IssuanceInput: assetID := inp.AssetID() - if _, err := m.getAsset(assetID.String()); err == nil { + if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil { continue } - asset := &orm.Asset{ + m.assetStore.Add(&orm.Asset{ AssetID: assetID.String(), IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram), VMVersion: inp.VMVersion, RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition), - } - if err := m.db.Create(asset).Error; err != nil { - return err - } - - m.assetCache.Add(asset.AssetID, asset) + }) } } } return nil } - -func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) { - if asset := m.assetCache.Get(assetID); asset != nil { - return asset, nil - } - - asset := &orm.Asset{AssetID: assetID} - if err := m.db.Where(asset).First(asset).Error; err != nil { - return nil, errors.Wrap(err, "asset not found in memory and mysql") - } - - m.assetCache.Add(assetID, asset) - return asset, nil -}