X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=wallet%2Funconfirmed.go;h=aabf8d282a524194b238d54d8a12e12dd339c3da;hp=bc2e3a2702328731060d2aa2525ed69bbb9840b3;hb=31f8f7cf1ffbec5365ab6ebf217537809cf714e5;hpb=08281341c2cb02ba11d4218576256688854790fc diff --git a/wallet/unconfirmed.go b/wallet/unconfirmed.go index bc2e3a27..aabf8d28 100644 --- a/wallet/unconfirmed.go +++ b/wallet/unconfirmed.go @@ -1,14 +1,16 @@ package wallet import ( - "encoding/json" + "encoding/hex" "fmt" "sort" "time" + "github.com/vapor/protocol/bc" + log "github.com/sirupsen/logrus" - "github.com/vapor/account" + acc "github.com/vapor/account" "github.com/vapor/blockchain/query" "github.com/vapor/crypto/sha3pool" "github.com/vapor/protocol" @@ -16,16 +18,10 @@ import ( ) const ( - //UnconfirmedTxPrefix is txpool unconfirmed transactions prefix - UnconfirmedTxPrefix = "UTXS:" UnconfirmedTxCheckPeriod = 30 * time.Minute MaxUnconfirmedTxDuration = 24 * time.Hour ) -func calcUnconfirmedTxKey(formatKey string) []byte { - return []byte(UnconfirmedTxPrefix + formatKey) -} - // SortByTimestamp implements sort.Interface for AnnotatedTx slices type SortByTimestamp []*query.AnnotatedTx @@ -33,10 +29,17 @@ func (a SortByTimestamp) Len() int { return len(a) } func (a SortByTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a SortByTimestamp) Less(i, j int) bool { return a[i].Timestamp > a[j].Timestamp } +// SortByHeight implements sort.Interface for AnnotatedTx slices +type SortByHeight []*query.AnnotatedTx + +func (a SortByHeight) Len() int { return len(a) } +func (a SortByHeight) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a SortByHeight) Less(i, j int) bool { return a[i].BlockHeight > a[j].BlockHeight } + // AddUnconfirmedTx handle wallet status update when tx add into txpool func (w *Wallet) AddUnconfirmedTx(txD *protocol.TxDesc) { if err := w.saveUnconfirmedTx(txD.Tx); err != nil { - log.WithField("err", err).Error("wallet fail on saveUnconfirmedTx") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("wallet fail on saveUnconfirmedTx") } utxos := txOutToUtxos(txD.Tx, txD.StatusFail, 0) @@ -47,50 +50,51 @@ func (w *Wallet) AddUnconfirmedTx(txD *protocol.TxDesc) { // GetUnconfirmedTxs get account unconfirmed transactions, filter transactions by accountID when accountID is not empty func (w *Wallet) GetUnconfirmedTxs(accountID string) ([]*query.AnnotatedTx, error) { annotatedTxs := []*query.AnnotatedTx{} - txIter := w.DB.IteratorPrefix([]byte(UnconfirmedTxPrefix)) - defer txIter.Release() - - for txIter.Next() { - annotatedTx := &query.AnnotatedTx{} - if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil { - return nil, err - } + annotatedTxs, err := w.Store.ListUnconfirmedTransactions() + if err != nil { + return nil, err + } - if accountID == "" || findTransactionsByAccount(annotatedTx, accountID) { + newAnnotatedTxs := []*query.AnnotatedTx{} + for _, annotatedTx := range annotatedTxs { + if accountID == "" || FindTransactionsByAccount(annotatedTx, accountID) { annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx}) - annotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, annotatedTxs...) + newAnnotatedTxs = append([]*query.AnnotatedTx{annotatedTx}, newAnnotatedTxs...) } } - sort.Sort(SortByTimestamp(annotatedTxs)) - return annotatedTxs, nil + sort.Sort(SortByTimestamp(newAnnotatedTxs)) + return newAnnotatedTxs, nil } // GetUnconfirmedTxByTxID get unconfirmed transaction by txID func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) { - annotatedTx := &query.AnnotatedTx{} - txInfo := w.DB.Get(calcUnconfirmedTxKey(txID)) - if txInfo == nil { - return nil, fmt.Errorf("No transaction(tx_id=%s) from txpool", txID) - } - - if err := json.Unmarshal(txInfo, annotatedTx); err != nil { + annotatedTx, err := w.Store.GetUnconfirmedTransaction(txID) + if err != nil { return nil, err } + if annotatedTx == nil { + return nil, fmt.Errorf("No transaction(tx_id=%s) from txpool", txID) + } annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx}) return annotatedTx, nil } // RemoveUnconfirmedTx handle wallet status update when tx removed from txpool func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) { + if !w.checkRelatedTransaction(txD.Tx) { + return + } + + w.Store.DeleteUnconfirmedTransaction(txD.Tx.ID.String()) w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds) } func (w *Wallet) buildAnnotatedUnconfirmedTx(tx *types.Tx) *query.AnnotatedTx { annotatedTx := &query.AnnotatedTx{ ID: tx.ID, - Timestamp: uint64(time.Now().Unix()), + Timestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)), Inputs: make([]*query.AnnotatedInput, 0, len(tx.Inputs)), Outputs: make([]*query.AnnotatedOutput, 0, len(tx.Outputs)), Size: tx.SerializedSize, @@ -109,8 +113,13 @@ func (w *Wallet) buildAnnotatedUnconfirmedTx(tx *types.Tx) *query.AnnotatedTx { func (w *Wallet) checkRelatedTransaction(tx *types.Tx) bool { for _, v := range tx.Outputs { var hash [32]byte - sha3pool.Sum256(hash[:], v.ControlProgram) - if bytes := w.DB.Get(account.ContractKey(hash)); bytes != nil { + sha3pool.Sum256(hash[:], v.ControlProgram()) + cp, err := w.AccountMgr.GetControlProgram(bc.NewHash(hash)) + if err != nil && err != acc.ErrFindCtrlProgram { + log.WithFields(log.Fields{"module": logModule, "err": err, "hash": hex.EncodeToString(hash[:])}).Error("checkRelatedTransaction fail.") + continue + } + if cp != nil { return true } } @@ -120,7 +129,12 @@ func (w *Wallet) checkRelatedTransaction(tx *types.Tx) bool { if err != nil { continue } - if bytes := w.DB.Get(account.StandardUTXOKey(outid)); bytes != nil { + utxo, err := w.Store.GetStandardUTXO(outid) + if err != nil && err != ErrGetStandardUTXO { + log.WithFields(log.Fields{"module": logModule, "err": err, "outputID": outid.String()}).Error("checkRelatedTransaction fail.") + continue + } + if utxo != nil { return true } } @@ -137,14 +151,11 @@ func (w *Wallet) saveUnconfirmedTx(tx *types.Tx) error { annotatedTx := w.buildAnnotatedUnconfirmedTx(tx) annotatedTxs := []*query.AnnotatedTx{} annotatedTxs = append(annotatedTxs, annotatedTx) - annotateTxsAccount(annotatedTxs, w.DB) + w.annotateTxsAccount(annotatedTxs) - rawTx, err := json.Marshal(annotatedTxs[0]) - if err != nil { + if err := w.Store.SetUnconfirmedTransaction(tx.ID.String(), annotatedTxs[0]); err != nil { return err } - - w.DB.Set(calcUnconfirmedTxKey(tx.ID.String()), rawTx) return nil } @@ -153,9 +164,10 @@ func (w *Wallet) delExpiredTxs() error { if err != nil { return err } + for _, tx := range AnnotatedTx { if time.Now().After(time.Unix(int64(tx.Timestamp), 0).Add(MaxUnconfirmedTxDuration)) { - w.DB.Delete(calcUnconfirmedTxKey(tx.ID.String())) + w.Store.DeleteUnconfirmedTransaction(tx.ID.String()) } } return nil @@ -164,15 +176,17 @@ func (w *Wallet) delExpiredTxs() error { //delUnconfirmedTx periodically delete locally stored timeout did not confirm txs func (w *Wallet) delUnconfirmedTx() { if err := w.delExpiredTxs(); err != nil { - log.WithField("err", err).Error("wallet fail on delUnconfirmedTx") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("wallet fail on delUnconfirmedTx") return } + ticker := time.NewTicker(UnconfirmedTxCheckPeriod) defer ticker.Stop() + for { <-ticker.C if err := w.delExpiredTxs(); err != nil { - log.WithField("err", err).Error("wallet fail on delUnconfirmedTx") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("wallet fail on delUnconfirmedTx") } } }