OSDN Git Service

add orm.asset
authorHAOYUatHZ <haoyu@protonmail.com>
Thu, 6 Jun 2019 06:20:03 +0000 (14:20 +0800)
committerHAOYUatHZ <haoyu@protonmail.com>
Thu, 6 Jun 2019 06:20:03 +0000 (14:20 +0800)
docs/federation/federation.sql
federation/database/orm/asset.go [new file with mode: 0644]
federation/database/orm/warder.go
federation/synchron/attach_block_processor.go [new file with mode: 0644]
federation/synchron/block_processor.go [new file with mode: 0644]
federation/synchron/unconfirmed_tx_keeper.go

index f3850d1..508dba8 100644 (file)
@@ -117,4 +117,33 @@ CREATE TABLE `cross_transaction_signs` (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
 LOCK TABLES `cross_transaction_signs` WRITE;
+UNLOCK TABLES;
+
+
+# Dump of table assets
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `assets`;
+
+CREATE TABLE `assets` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `asset_id` varchar(64) NOT NULL,
+  `issuance_program` varchar(64) NOT NULL,
+  `vm_version` int(11) NOT NULL DEFAULT '1',
+  `raw_definition_bytes` text,
+  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `asset_id` (`asset_id`),
+  UNIQUE KEY `asset_meta` (`issuance_program`,`vm_version`,`raw_definition_bytes`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+LOCK TABLES `assets` WRITE;
+/*!40000 ALTER TABLE `assets` DISABLE KEYS */;
+
+INSERT INTO `assets` (`id`, `asset_id`, `issuance_program`, `vm_version`, `raw_definition_bytes`, `created_at`, `updated_at`)
+VALUES
+  (1,'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff','',1,'7b0a202022646563696d616c73223a20382c0a2020226465736372697074696f6e223a20224279746f6d204f6666696369616c204973737565222c0a2020226e616d65223a202242544d222c0a20202273796d626f6c223a202242544d220a7d','2018-09-13 05:10:43','2018-11-27 09:43:35');
+
+/*!40000 ALTER TABLE `assets` ENABLE KEYS */;
 UNLOCK TABLES;
\ No newline at end of file
diff --git a/federation/database/orm/asset.go b/federation/database/orm/asset.go
new file mode 100644 (file)
index 0000000..8b2b015
--- /dev/null
@@ -0,0 +1,15 @@
+package orm
+
+import (
+       "github.com/vapor/federation/types"
+)
+
+type Asset struct {
+       ID                 uint64 `gorm:"primary_key"`
+       AssetID            string
+       IssuanceProgram    string
+       VMVersion          uint64
+       RawDefinitionBytes string
+       CreatedAt          types.Timestamp
+       UpdatedAt          types.Timestamp
+}
index 39976db..8aa6ca4 100644 (file)
@@ -5,7 +5,7 @@ import (
 )
 
 type Warder struct {
-       ID        uint64
+       ID        uint64 `gorm:"primary_key"`
        Pubkey    string
        CreatedAt types.Timestamp
        UpdatedAt types.Timestamp
diff --git a/federation/synchron/attach_block_processor.go b/federation/synchron/attach_block_processor.go
new file mode 100644 (file)
index 0000000..ad26fa9
--- /dev/null
@@ -0,0 +1,220 @@
+package synchron
+
+import (
+       // "database/sql"
+       // "encoding/hex"
+       // "encoding/json"
+       // "fmt"
+       // "math/big"
+       // "sort"
+
+       // "github.com/bytom/consensus"
+       // "github.com/bytom/errors"
+       // TODO:
+       btmBc "github.com/bytom/protocol/bc"
+       btmTypes "github.com/bytom/protocol/bc/types"
+       "github.com/jinzhu/gorm"
+       // TODO:
+       // "github.com/blockcenter/coin/btm"
+       // "github.com/blockcenter/database/orm"
+)
+
+type attachBlockProcessor struct {
+       db *gorm.DB
+       // coin     *orm.Coin
+       block    *btmTypes.Block
+       txStatus *btmBc.TransactionStatus
+}
+
+func (p *attachBlockProcessor) processIssuing(db *gorm.DB, txs []*btmTypes.Tx) error {
+       return addIssueAssets(db, txs)
+}
+
+/*
+func (p *attachBlockProcessor) getBlock() *btmTypes.Block {
+       return p.block
+}
+
+func (p *attachBlockProcessor) getTxStatus() *bc.TransactionStatus {
+       return p.txStatus
+}
+
+func (p *attachBlockProcessor) getCoin() *orm.Coin {
+       return p.coin
+}
+
+func (p *attachBlockProcessor) processCoinInfo() error {
+       blockHash := p.block.Hash()
+       p.coin.BlockHeight = p.block.Height
+       p.coin.BlockHash = blockHash.String()
+       db := p.db.Model(p.coin).Where("block_hash = ?", p.block.PreviousBlockHash.String()).Updates(p.coin)
+       if err := db.Error; err != nil {
+               return err
+       }
+
+       if db.RowsAffected != 1 {
+               return ErrInconsistentDB
+       }
+       return nil
+}
+
+type addressTxSorter []*orm.AddressTransaction
+
+func (a addressTxSorter) Len() int      { return len(a) }
+func (a addressTxSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a addressTxSorter) Less(i, j int) bool {
+       return a[i].TransactionID < a[j].TransactionID ||
+               (a[i].TransactionID == a[j].TransactionID && a[i].AddressID < a[j].AddressID) ||
+               (a[i].TransactionID == a[j].TransactionID && a[i].AddressID == a[j].AddressID && a[i].AssetID < a[j].AssetID)
+}
+
+func (p *attachBlockProcessor) processAddressTransaction(mappings []*addressTxMapping) error {
+       txMap := make(map[string]int64)
+       addressTxMap := make(map[string]*orm.AddressTransaction)
+
+       for _, m := range mappings {
+               txHash := m.transaction.ID.String()
+               if _, ok := txMap[txHash]; !ok {
+                       txID, err := p.upsertTransaction(m)
+                       if err != nil {
+                               return err
+                       }
+
+                       txMap[txHash] = txID
+               }
+
+               // is smart contract
+               if m.address == nil {
+                       continue
+               }
+
+               var amount int64
+               switch m.source.(type) {
+               case *btmTypes.TxInput:
+                       amount -= int64(m.amount)
+
+               case *btmTypes.TxOutput:
+                       amount = int64(m.amount)
+               }
+
+               addressTxKey := fmt.Sprintf("%d:%d:%d", m.address.ID, txMap[txHash], m.asset.ID)
+               addressTx, ok := addressTxMap[addressTxKey]
+               if !ok {
+                       addressTx = &orm.AddressTransaction{
+                               AddressID:     m.address.ID,
+                               TransactionID: uint64(txMap[txHash]),
+                               AssetID:       m.asset.ID,
+                       }
+                       addressTxMap[addressTxKey] = addressTx
+               }
+
+               addressTx.Amount += amount
+       }
+
+       var mergedAddrTxs []*orm.AddressTransaction
+       for _, addressTx := range addressTxMap {
+               mergedAddrTxs = append(mergedAddrTxs, addressTx)
+       }
+       sort.Sort(addressTxSorter(mergedAddrTxs))
+
+       for _, addressTx := range mergedAddrTxs {
+               if err := p.db.Where(addressTx).FirstOrCreate(addressTx).Error; err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (p *attachBlockProcessor) upsertTransaction(mapping *addressTxMapping) (int64, error) {
+       rawTx, err := mapping.transaction.MarshalText()
+       if err != nil {
+               return 0, err
+       }
+
+       tx := &orm.Transaction{Hash: mapping.transaction.ID.String()}
+       p.db.Unscoped().Where(tx).First(tx)
+       // collided confirmed tx hash
+       if tx.BlockHeight > 0 {
+               return int64(tx.ID), nil
+       }
+
+       tx.CoinID = p.coin.ID
+       tx.TxIndex = mapping.txIndex
+       tx.RawData = string(rawTx)
+       tx.BlockHeight = p.block.Height
+       tx.BlockTimestamp = p.block.Timestamp
+       tx.StatusFail = mapping.statusFail
+       return int64(tx.ID), p.db.Unscoped().Save(tx).Error
+}
+
+func (p *attachBlockProcessor) processSpendBalance(input *btmTypes.TxInput, deltaBalance *deltaBalance) {
+       amount := big.NewInt(0)
+       amount.SetUint64(input.Amount())
+       deltaBalance.Balance.Sub(deltaBalance.Balance, amount)
+       deltaBalance.TotalSent.Add(deltaBalance.TotalSent, amount)
+}
+
+func (p *attachBlockProcessor) processReceiveBalance(output *btmTypes.TxOutput, deltaBalance *deltaBalance) {
+       amount := big.NewInt(0)
+       amount.SetUint64(output.Amount)
+       deltaBalance.Balance.Add(deltaBalance.Balance, amount)
+       deltaBalance.TotalReceived.Add(deltaBalance.TotalReceived, amount)
+}
+
+func (p *attachBlockProcessor) processSpendUTXO(utxoIDList []string) error {
+       return p.db.Model(&orm.Utxo{}).Where("hash in (?)", utxoIDList).Update("is_spend", true).Error
+}
+
+func (p *attachBlockProcessor) processReceiveUTXO(m *addressTxMapping) error {
+       outputID := m.transaction.OutputID(m.sourceIndex)
+       output, err := m.transaction.Output(*outputID)
+       if err != nil {
+               return err
+       }
+
+       rawUtxo := &btm.UTXO{
+               SourceID:  output.Source.Ref,
+               SourcePos: uint64(m.sourceIndex),
+       }
+       rawData, err := json.Marshal(rawUtxo)
+       if err != nil {
+               return err
+       }
+
+       validHeight := p.block.Height
+       if m.txIndex == 0 && p.block.Height != 0 {
+               validHeight += consensus.CoinbasePendingBlockNumber
+       }
+
+       var cp []byte
+       switch source := m.source.(type) {
+       case *btmTypes.TxOutput:
+               cp = source.ControlProgram
+       default:
+               return errors.New("wrong source type for processReceiveUTXO")
+       }
+
+       utxo := &orm.Utxo{Hash: outputID.String()}
+       err = p.db.Where(&orm.Utxo{Hash: outputID.String()}).First(utxo).Error
+       if err != nil && err != gorm.ErrRecordNotFound {
+               return err
+       }
+
+       utxo.BlockHeight = p.block.Height
+       utxo.ValidHeight = validHeight
+       utxo.IsSpend = false
+       utxo.AssetID = m.asset.ID
+       utxo.Amount = output.Source.Value.Amount
+       utxo.RawData = string(rawData)
+       utxo.ControlProgram = hex.EncodeToString(cp)
+
+       if m.address != nil {
+               utxo.AddressID = sql.NullInt64{Int64: int64(m.address.ID), Valid: true}
+       }
+
+       if err == gorm.ErrRecordNotFound {
+               return p.db.Create(utxo).Error
+       }
+       return p.db.Model(&orm.Utxo{}).Update(utxo).Error
+}
+*/
diff --git a/federation/synchron/block_processor.go b/federation/synchron/block_processor.go
new file mode 100644 (file)
index 0000000..010a150
--- /dev/null
@@ -0,0 +1,68 @@
+package synchron
+
+import (
+       // "encoding/hex"
+       // "encoding/json"
+       // "fmt"
+       // "math/big"
+       // "sort"
+
+       // "github.com/bytom/consensus"
+       // "github.com/bytom/consensus/segwit"
+       // "github.com/bytom/errors"
+       // "github.com/bytom/protocol/bc"
+       btmTypes "github.com/bytom/protocol/bc/types"
+       // "github.com/bytom/protocol/vm/vmutil"
+       "github.com/jinzhu/gorm"
+       // log "github.com/sirupsen/logrus"
+       // "github.com/blockcenter/coin/btm"
+       // "github.com/blockcenter/config"
+       // "github.com/blockcenter/database/orm"
+       // "github.com/blockcenter/types"
+)
+
+func addIssueAssets(db *gorm.DB, txs []*btmTypes.Tx) error {
+       /*
+               var assets []*orm.Asset
+               assetMap := make(map[string]bool)
+
+               type assetDefinition struct {
+                       Decimals uint64 `json:"decimals"`
+               }
+
+               for _, tx := range txs {
+                       for _, input := range tx.Inputs {
+                               switch inp := input.TypedInput.(type) {
+                               case *btmTypes.IssuanceInput:
+                                       assetID := inp.AssetID()
+                                       if _, ok := assetMap[assetID.String()]; ok {
+                                               continue
+                                       }
+                                       assetMap[assetID.String()] = true
+
+                                       asset := &orm.Asset{}
+                                       definition := &assetDefinition{}
+                                       if err := json.Unmarshal(inp.AssetDefinition, definition); err != nil {
+                                               log.WithFields(log.Fields{
+                                                       "err":             err,
+                                                       "AssetDefinition": inp.AssetDefinition,
+                                               }).Error("json unmarshal AssetDefinition")
+                                       }
+
+                                       asset.CoinID = coinID
+                                       asset.Decimals = definition.Decimals
+                                       asset.Asset = assetID.String()
+                                       asset.Definition = string(inp.AssetDefinition)
+                                       assets = append(assets, asset)
+                               }
+                       }
+               }
+
+               for _, asset := range assets {
+                       if err := db.Where(&orm.Asset{CoinID: asset.CoinID, Asset: asset.Asset}).FirstOrCreate(asset).Error; err != nil {
+                               return err
+                       }
+               }
+       */
+       return nil
+}
index 2b220b9..6d7481e 100644 (file)
@@ -89,21 +89,21 @@ func (u *unconfirmedTxKeeper) AddUnconfirmedTx( /*coin *orm.Coin, */ txDesc *TxD
        }
 
        txs := []*btmTypes.Tx{txDesc.Tx}
-       if err := bp.processIssuing(dbTx, txs /* bp.getCoin().ID*/); err != nil {
+       if err := bp.processIssuing(dbTx, txs); err != nil {
                dbTx.Rollback()
                return err
        }
 
-       mappings, err := GetAddressTxMappings(u.cfg, txs, txStatus, dbTx)
-       if err != nil {
-               dbTx.Rollback()
-               return err
-       }
+       // mappings, err := GetAddressTxMappings(u.cfg, txs, txStatus, dbTx)
+       // if err != nil {
+       //      dbTx.Rollback()
+       //      return err
+       // }
 
-       if err := bp.processAddressTransaction(mappings); err != nil {
-               dbTx.Rollback()
-               return err
-       }
+       // if err := bp.processAddressTransaction(mappings); err != nil {
+       //      dbTx.Rollback()
+       //      return err
+       // }
 
        return dbTx.Commit().Error
 }