X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=wallet%2Findexer.go;h=9f55115aeb27d6a976a6374e9a5f26b9672cc432;hp=4bff9c942b7c6705bf2a76a0ab98f65e735728d0;hb=489e57ce3c46eb9e8ca25c7e966a1ea26fe41d57;hpb=c18a765ea091998d61cb630250a9129f9f62f8ae diff --git a/wallet/indexer.go b/wallet/indexer.go index 4bff9c94..9f55115a 100644 --- a/wallet/indexer.go +++ b/wallet/indexer.go @@ -3,62 +3,20 @@ package wallet import ( "encoding/binary" "encoding/hex" - "encoding/json" "fmt" "sort" log "github.com/sirupsen/logrus" "github.com/vapor/account" - "github.com/vapor/asset" "github.com/vapor/blockchain/query" "github.com/vapor/consensus" "github.com/vapor/crypto/sha3pool" - dbm "github.com/vapor/database/leveldb" chainjson "github.com/vapor/encoding/json" - "github.com/vapor/errors" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" ) -const ( - //TxPrefix is wallet database transactions prefix - TxPrefix = "TXS:" - //TxIndexPrefix is wallet database tx index prefix - TxIndexPrefix = "TID:" - //TxIndexPrefix is wallet database global tx index prefix - GlobalTxIndexPrefix = "GTID:" -) - -var ErrAccntTxIDNotFound = errors.New("account TXID not found") - -func formatKey(blockHeight uint64, position uint32) string { - return fmt.Sprintf("%016x%08x", blockHeight, position) -} - -func calcAnnotatedKey(formatKey string) []byte { - return []byte(TxPrefix + formatKey) -} - -func calcDeleteKey(blockHeight uint64) []byte { - return []byte(fmt.Sprintf("%s%016x", TxPrefix, blockHeight)) -} - -func calcTxIndexKey(txID string) []byte { - return []byte(TxIndexPrefix + txID) -} - -func calcGlobalTxIndexKey(txID string) []byte { - return []byte(GlobalTxIndexPrefix + txID) -} - -func calcGlobalTxIndex(blockHash *bc.Hash, position uint64) []byte { - txIdx := make([]byte, 40) - copy(txIdx[:32], blockHash.Bytes()) - binary.BigEndian.PutUint64(txIdx[32:], position) - return txIdx -} - func parseGlobalTxIdx(globalTxIdx []byte) (*bc.Hash, uint64) { var hashBytes [32]byte copy(hashBytes[:], globalTxIdx[:32]) @@ -67,37 +25,31 @@ func parseGlobalTxIdx(globalTxIdx []byte) (*bc.Hash, uint64) { return &hash, position } -// deleteTransaction delete transactions when orphan block rollback -func (w *Wallet) deleteTransactions(batch dbm.Batch, height uint64) { - tmpTx := query.AnnotatedTx{} - txIter := w.DB.IteratorPrefix(calcDeleteKey(height)) - defer txIter.Release() - - for txIter.Next() { - if err := json.Unmarshal(txIter.Value(), &tmpTx); err == nil { - batch.Delete(calcTxIndexKey(tmpTx.ID.String())) - } - batch.Delete(txIter.Key()) - } -} - // saveExternalAssetDefinition save external and local assets definition, // when query ,query local first and if have no then query external // details see getAliasDefinition -func saveExternalAssetDefinition(b *types.Block, walletDB dbm.DB) { - storeBatch := walletDB.NewBatch() - defer storeBatch.Write() +func saveExternalAssetDefinition(b *types.Block, store WalletStore) error { + newStore := store.InitBatch() for _, tx := range b.Transactions { for _, orig := range tx.Inputs { if cci, ok := orig.TypedInput.(*types.CrossChainInput); ok { assetID := cci.AssetId - if assetExist := walletDB.Get(asset.ExtAssetKey(assetID)); assetExist == nil { - storeBatch.Set(asset.ExtAssetKey(assetID), cci.AssetDefinition) + if _, err := newStore.GetAsset(assetID); err == nil { + continue + } else if err != ErrGetAsset { + return err } + + newStore.SetAssetDefinition(assetID, cci.AssetDefinition) } } } + if err := newStore.CommitBatch(); err != nil { + return err + } + + return nil } // Summary is the struct of transaction's input and output summary @@ -120,23 +72,13 @@ type TxSummary struct { } // indexTransactions saves all annotated transactions to the database. -func (w *Wallet) indexTransactions(batch dbm.Batch, b *types.Block, txStatus *bc.TransactionStatus) error { - annotatedTxs := w.filterAccountTxs(b, txStatus) - saveExternalAssetDefinition(b, w.DB) - annotateTxsAccount(annotatedTxs, w.DB) - +func (w *Wallet) indexTransactions(b *types.Block, txStatus *bc.TransactionStatus, annotatedTxs []*query.AnnotatedTx, store WalletStore) error { for _, tx := range annotatedTxs { - rawTx, err := json.Marshal(tx) - if err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("inserting annotated_txs to db") + if err := w.Store.SetTransaction(b.Height, tx); err != nil { return err } - batch.Set(calcAnnotatedKey(formatKey(b.Height, uint32(tx.Position))), rawTx) - batch.Set(calcTxIndexKey(tx.ID.String()), []byte(formatKey(b.Height, uint32(tx.Position)))) - - // delete unconfirmed transaction - batch.Delete(calcUnconfirmedTxKey(tx.ID.String())) + store.DeleteUnconfirmedTransaction(tx.ID.String()) } if !w.TxIndexFlag { @@ -145,7 +87,7 @@ func (w *Wallet) indexTransactions(batch dbm.Batch, b *types.Block, txStatus *bc for position, globalTx := range b.Transactions { blockHash := b.BlockHeader.Hash() - batch.Set(calcGlobalTxIndexKey(globalTx.ID.String()), calcGlobalTxIndex(&blockHash, uint64(position))) + store.SetGlobalTransactionIndex(globalTx.ID.String(), &blockHash, uint64(position)) } return nil @@ -161,21 +103,25 @@ transactionLoop: for _, v := range tx.Outputs { var hash [32]byte sha3pool.Sum256(hash[:], v.ControlProgram()) - - if bytes := w.DB.Get(account.ContractKey(hash)); bytes != nil { + if _, err := w.AccountMgr.GetControlProgram(bc.NewHash(hash)); err == nil { annotatedTxs = append(annotatedTxs, w.buildAnnotatedTransaction(tx, b, statusFail, pos)) continue transactionLoop + } else if err != account.ErrFindCtrlProgram { + log.WithFields(log.Fields{"module": logModule, "err": err, "hash": hex.EncodeToString(hash[:])}).Error("filterAccountTxs fail.") } } for _, v := range tx.Inputs { outid, err := v.SpentOutputID() if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err, "outputID": outid.String()}).Error("filterAccountTxs fail.") continue } - if bytes := w.DB.Get(account.StandardUTXOKey(outid)); bytes != nil { + if _, err = w.Store.GetStandardUTXO(outid); err == nil { annotatedTxs = append(annotatedTxs, w.buildAnnotatedTransaction(tx, b, statusFail, pos)) continue transactionLoop + } else if err != ErrGetStandardUTXO { + log.WithFields(log.Fields{"module": logModule, "err": err, "outputID": outid.String()}).Error("filterAccountTxs fail.") } } } @@ -195,14 +141,8 @@ func (w *Wallet) GetTransactionByTxID(txID string) (*query.AnnotatedTx, error) { } func (w *Wallet) getAccountTxByTxID(txID string) (*query.AnnotatedTx, error) { - annotatedTx := &query.AnnotatedTx{} - formatKey := w.DB.Get(calcTxIndexKey(txID)) - if formatKey == nil { - return nil, ErrAccntTxIDNotFound - } - - txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey))) - if err := json.Unmarshal(txInfo, annotatedTx); err != nil { + annotatedTx, err := w.Store.GetTransaction(txID) + if err != nil { return nil, err } @@ -211,18 +151,18 @@ func (w *Wallet) getAccountTxByTxID(txID string) (*query.AnnotatedTx, error) { } func (w *Wallet) getGlobalTxByTxID(txID string) (*query.AnnotatedTx, error) { - globalTxIdx := w.DB.Get(calcGlobalTxIndexKey(txID)) + globalTxIdx := w.Store.GetGlobalTransactionIndex(txID) if globalTxIdx == nil { return nil, fmt.Errorf("No transaction(tx_id=%s) ", txID) } blockHash, pos := parseGlobalTxIdx(globalTxIdx) - block, err := w.chain.GetBlockByHash(blockHash) + block, err := w.Chain.GetBlockByHash(blockHash) if err != nil { return nil, err } - txStatus, err := w.chain.GetTransactionStatus(blockHash) + txStatus, err := w.Chain.GetTransactionStatus(blockHash) if err != nil { return nil, err } @@ -291,37 +231,16 @@ func findTransactionsByAccount(annotatedTx *query.AnnotatedTx, accountID string) // GetTransactions get all walletDB transactions or unconfirmed transactions, and filter transactions by accountID and StartTxID optional func (w *Wallet) GetTransactions(accountID string, StartTxID string, count uint, unconfirmed bool) ([]*query.AnnotatedTx, error) { annotatedTxs := []*query.AnnotatedTx{} - var startKey []byte - preFix := TxPrefix - - if StartTxID != "" { - if unconfirmed { - startKey = calcUnconfirmedTxKey(StartTxID) - } else { - formatKey := w.DB.Get(calcTxIndexKey(StartTxID)) - if formatKey == nil { - return nil, ErrAccntTxIDNotFound - } - startKey = calcAnnotatedKey(string(formatKey)) - } - } - - if unconfirmed { - preFix = UnconfirmedTxPrefix + annotatedTxs, err := w.Store.ListTransactions(accountID, StartTxID, count, unconfirmed) + if err != nil { + return nil, err } - itr := w.DB.IteratorPrefixWithStart([]byte(preFix), startKey, true) - defer itr.Release() - - for txNum := count; itr.Next() && txNum > 0; txNum-- { - annotatedTx := &query.AnnotatedTx{} - if err := json.Unmarshal(itr.Value(), &annotatedTx); err != nil { - return nil, err - } - + newAnnotatedTxs := []*query.AnnotatedTx{} + for _, annotatedTx := range annotatedTxs { if accountID == "" || findTransactionsByAccount(annotatedTx, accountID) { annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx}) - annotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, annotatedTxs...) + newAnnotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, newAnnotatedTxs...) } } @@ -331,7 +250,7 @@ func (w *Wallet) GetTransactions(accountID string, StartTxID string, count uint, sort.Sort(SortByHeight(annotatedTxs)) } - return annotatedTxs, nil + return newAnnotatedTxs, nil } // GetAccountBalances return all account balances