OSDN Git Service

Wallet store interface (#217)
[bytom/vapor.git] / wallet / indexer.go
index 73d6b79..317ee0e 100644 (file)
@@ -3,62 +3,20 @@ package wallet
 import (
        "encoding/binary"
        "encoding/hex"
-       "encoding/json"
        "fmt"
        "sort"
 
        log "github.com/sirupsen/logrus"
 
        "github.com/vapor/account"
-       "github.com/vapor/asset"
        "github.com/vapor/blockchain/query"
        "github.com/vapor/consensus"
        "github.com/vapor/crypto/sha3pool"
-       dbm "github.com/vapor/database/leveldb"
        chainjson "github.com/vapor/encoding/json"
-       "github.com/vapor/errors"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
-const (
-       //TxPrefix is wallet database transactions prefix
-       TxPrefix = "TXS:"
-       //TxIndexPrefix is wallet database tx index prefix
-       TxIndexPrefix = "TID:"
-       //TxIndexPrefix is wallet database global tx index prefix
-       GlobalTxIndexPrefix = "GTID:"
-)
-
-var ErrAccntTxIDNotFound = errors.New("account TXID not found")
-
-func formatKey(blockHeight uint64, position uint32) string {
-       return fmt.Sprintf("%016x%08x", blockHeight, position)
-}
-
-func calcAnnotatedKey(formatKey string) []byte {
-       return []byte(TxPrefix + formatKey)
-}
-
-func calcDeleteKey(blockHeight uint64) []byte {
-       return []byte(fmt.Sprintf("%s%016x", TxPrefix, blockHeight))
-}
-
-func calcTxIndexKey(txID string) []byte {
-       return []byte(TxIndexPrefix + txID)
-}
-
-func calcGlobalTxIndexKey(txID string) []byte {
-       return []byte(GlobalTxIndexPrefix + txID)
-}
-
-func calcGlobalTxIndex(blockHash *bc.Hash, position uint64) []byte {
-       txIdx := make([]byte, 40)
-       copy(txIdx[:32], blockHash.Bytes())
-       binary.BigEndian.PutUint64(txIdx[32:], position)
-       return txIdx
-}
-
 func parseGlobalTxIdx(globalTxIdx []byte) (*bc.Hash, uint64) {
        var hashBytes [32]byte
        copy(hashBytes[:], globalTxIdx[:32])
@@ -67,37 +25,31 @@ func parseGlobalTxIdx(globalTxIdx []byte) (*bc.Hash, uint64) {
        return &hash, position
 }
 
-// deleteTransaction delete transactions when orphan block rollback
-func (w *Wallet) deleteTransactions(batch dbm.Batch, height uint64) {
-       tmpTx := query.AnnotatedTx{}
-       txIter := w.DB.IteratorPrefix(calcDeleteKey(height))
-       defer txIter.Release()
-
-       for txIter.Next() {
-               if err := json.Unmarshal(txIter.Value(), &tmpTx); err == nil {
-                       batch.Delete(calcTxIndexKey(tmpTx.ID.String()))
-               }
-               batch.Delete(txIter.Key())
-       }
-}
-
 // saveExternalAssetDefinition save external and local assets definition,
 // when query ,query local first and if have no then query external
 // details see getAliasDefinition
-func saveExternalAssetDefinition(b *types.Block, walletDB dbm.DB) {
-       storeBatch := walletDB.NewBatch()
-       defer storeBatch.Write()
+func saveExternalAssetDefinition(b *types.Block, store WalletStore) error {
+       newStore := store.InitBatch()
 
        for _, tx := range b.Transactions {
                for _, orig := range tx.Inputs {
                        if cci, ok := orig.TypedInput.(*types.CrossChainInput); ok {
                                assetID := cci.AssetId
-                               if assetExist := walletDB.Get(asset.ExtAssetKey(assetID)); assetExist == nil {
-                                       storeBatch.Set(asset.ExtAssetKey(assetID), cci.AssetDefinition)
+                               if _, err := newStore.GetAsset(assetID); err == nil {
+                                       continue
+                               } else if err != ErrGetAsset {
+                                       return err
                                }
+
+                               newStore.SetAssetDefinition(assetID, cci.AssetDefinition)
                        }
                }
        }
+       if err := newStore.CommitBatch(); err != nil {
+               return err
+       }
+
+       return nil
 }
 
 // Summary is the struct of transaction's input and output summary
@@ -120,23 +72,13 @@ type TxSummary struct {
 }
 
 // indexTransactions saves all annotated transactions to the database.
-func (w *Wallet) indexTransactions(batch dbm.Batch, b *types.Block, txStatus *bc.TransactionStatus) error {
-       annotatedTxs := w.filterAccountTxs(b, txStatus)
-       saveExternalAssetDefinition(b, w.DB)
-       annotateTxsAccount(annotatedTxs, w.DB)
-
+func (w *Wallet) indexTransactions(b *types.Block, txStatus *bc.TransactionStatus, annotatedTxs []*query.AnnotatedTx, store WalletStore) error {
        for _, tx := range annotatedTxs {
-               rawTx, err := json.Marshal(tx)
-               if err != nil {
-                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("inserting annotated_txs to db")
+               if err := w.Store.SetTransaction(b.Height, tx); err != nil {
                        return err
                }
 
-               batch.Set(calcAnnotatedKey(formatKey(b.Height, uint32(tx.Position))), rawTx)
-               batch.Set(calcTxIndexKey(tx.ID.String()), []byte(formatKey(b.Height, uint32(tx.Position))))
-
-               // delete unconfirmed transaction
-               batch.Delete(calcUnconfirmedTxKey(tx.ID.String()))
+               store.DeleteUnconfirmedTransaction(tx.ID.String())
        }
 
        if !w.TxIndexFlag {
@@ -145,7 +87,7 @@ func (w *Wallet) indexTransactions(batch dbm.Batch, b *types.Block, txStatus *bc
 
        for position, globalTx := range b.Transactions {
                blockHash := b.BlockHeader.Hash()
-               batch.Set(calcGlobalTxIndexKey(globalTx.ID.String()), calcGlobalTxIndex(&blockHash, uint64(position)))
+               store.SetGlobalTransactionIndex(globalTx.ID.String(), &blockHash, uint64(position))
        }
 
        return nil
@@ -161,21 +103,25 @@ transactionLoop:
                for _, v := range tx.Outputs {
                        var hash [32]byte
                        sha3pool.Sum256(hash[:], v.ControlProgram())
-
-                       if bytes := w.DB.Get(account.ContractKey(hash)); bytes != nil {
+                       if _, err := w.AccountMgr.GetControlProgram(bc.NewHash(hash)); err == nil {
                                annotatedTxs = append(annotatedTxs, w.buildAnnotatedTransaction(tx, b, statusFail, pos))
                                continue transactionLoop
+                       } else {
+                               log.WithFields(log.Fields{"module": logModule, "err": err, "hash": hex.EncodeToString(hash[:])}).Info("filterAccountTxs fail.")
                        }
                }
 
                for _, v := range tx.Inputs {
                        outid, err := v.SpentOutputID()
                        if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err, "outputID": hex.EncodeToString(outid.Bytes())}).Info("filterAccountTxs fail.")
                                continue
                        }
-                       if bytes := w.DB.Get(account.StandardUTXOKey(outid)); bytes != nil {
+                       if _, err = w.Store.GetStandardUTXO(outid); err == nil {
                                annotatedTxs = append(annotatedTxs, w.buildAnnotatedTransaction(tx, b, statusFail, pos))
                                continue transactionLoop
+                       } else {
+                               log.WithFields(log.Fields{"module": logModule, "err": err, "outputID": hex.EncodeToString(outid.Bytes())}).Info("filterAccountTxs fail.")
                        }
                }
        }
@@ -195,14 +141,8 @@ func (w *Wallet) GetTransactionByTxID(txID string) (*query.AnnotatedTx, error) {
 }
 
 func (w *Wallet) getAccountTxByTxID(txID string) (*query.AnnotatedTx, error) {
-       annotatedTx := &query.AnnotatedTx{}
-       formatKey := w.DB.Get(calcTxIndexKey(txID))
-       if formatKey == nil {
-               return nil, ErrAccntTxIDNotFound
-       }
-
-       txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey)))
-       if err := json.Unmarshal(txInfo, annotatedTx); err != nil {
+       annotatedTx, err := w.Store.GetTransaction(txID)
+       if err != nil {
                return nil, err
        }
 
@@ -211,18 +151,18 @@ func (w *Wallet) getAccountTxByTxID(txID string) (*query.AnnotatedTx, error) {
 }
 
 func (w *Wallet) getGlobalTxByTxID(txID string) (*query.AnnotatedTx, error) {
-       globalTxIdx := w.DB.Get(calcGlobalTxIndexKey(txID))
+       globalTxIdx := w.Store.GetGlobalTransactionIndex(txID)
        if globalTxIdx == nil {
                return nil, fmt.Errorf("No transaction(tx_id=%s) ", txID)
        }
 
        blockHash, pos := parseGlobalTxIdx(globalTxIdx)
-       block, err := w.chain.GetBlockByHash(blockHash)
+       block, err := w.Chain.GetBlockByHash(blockHash)
        if err != nil {
                return nil, err
        }
 
-       txStatus, err := w.chain.GetTransactionStatus(blockHash)
+       txStatus, err := w.Chain.GetTransactionStatus(blockHash)
        if err != nil {
                return nil, err
        }
@@ -291,38 +231,16 @@ func findTransactionsByAccount(annotatedTx *query.AnnotatedTx, accountID string)
 // GetTransactions get all walletDB transactions or unconfirmed transactions, and filter transactions by accountID and StartTxID optional
 func (w *Wallet) GetTransactions(accountID string, StartTxID string, count uint, unconfirmed bool) ([]*query.AnnotatedTx, error) {
        annotatedTxs := []*query.AnnotatedTx{}
-       var startKey []byte
-       preFix := TxPrefix
-
-       if StartTxID != "" {
-               if unconfirmed {
-                       startKey = calcUnconfirmedTxKey(StartTxID)
-               } else {
-                       formatKey := w.DB.Get(calcTxIndexKey(StartTxID))
-                       if formatKey == nil {
-                               return nil, ErrAccntTxIDNotFound
-                       }
-                       startKey = calcAnnotatedKey(string(formatKey))
-               }
-       }
-
-       if unconfirmed {
-               preFix = UnconfirmedTxPrefix
+       annotatedTxs, err := w.Store.ListTransactions(accountID, StartTxID, count, unconfirmed)
+       if err != nil {
+               return nil, err
        }
 
-       itr := w.DB.IteratorPrefixWithStart([]byte(preFix), startKey, true)
-       defer itr.Release()
-
-       for txNum := count; itr.Next() && txNum > 0; {
-               annotatedTx := &query.AnnotatedTx{}
-               if err := json.Unmarshal(itr.Value(), &annotatedTx); err != nil {
-                       return nil, err
-               }
-
+       newAnnotatedTxs := []*query.AnnotatedTx{}
+       for _, annotatedTx := range annotatedTxs {
                if accountID == "" || findTransactionsByAccount(annotatedTx, accountID) {
                        annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx})
-                       annotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, annotatedTxs...)
-                       txNum--
+                       newAnnotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, newAnnotatedTxs...)
                }
        }
 
@@ -332,7 +250,7 @@ func (w *Wallet) GetTransactions(accountID string, StartTxID string, count uint,
                sort.Sort(SortByHeight(annotatedTxs))
        }
 
-       return annotatedTxs, nil
+       return newAnnotatedTxs, nil
 }
 
 // GetAccountBalances return all account balances