X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=wallet%2Fwallet.go;h=0a955d21811e7eb695db86739a8b55878282911a;hb=a80fca076057c63e17da6ea1dc72c33cd658027b;hp=3f764185df976cc8e556a058d96e724d9c7840ce;hpb=ddc7106558f020bde24cc337d51649611dddaba8;p=bytom%2Fvapor.git diff --git a/wallet/wallet.go b/wallet/wallet.go index 3f764185..0a955d21 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -1,16 +1,15 @@ package wallet import ( - "encoding/json" "sync" log "github.com/sirupsen/logrus" - "github.com/tendermint/tmlibs/db" "github.com/vapor/account" "github.com/vapor/asset" "github.com/vapor/blockchain/pseudohsm" - "github.com/vapor/common" + "github.com/vapor/errors" + "github.com/vapor/event" "github.com/vapor/protocol" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" @@ -18,13 +17,24 @@ import ( const ( //SINGLE single sign - SINGLE = 1 + SINGLE = 1 + logModule = "wallet" ) -var walletKey = []byte("walletInfo") +var ( + currentVersion = uint(1) + + errBestBlockNotFoundInCore = errors.New("best block not found in core") + errWalletVersionMismatch = errors.New("wallet version mismatch") + ErrGetWalletStatusInfo = errors.New("failed get wallet info") + ErrGetAsset = errors.New("Failed to find asset definition") + ErrAccntTxIDNotFound = errors.New("account TXID not found") + ErrGetStandardUTXO = errors.New("failed get standard UTXO") +) //StatusInfo is base valid block info to handle orphan block rollback type StatusInfo struct { + Version uint WorkHeight uint64 WorkHash bc.Hash BestHeight uint64 @@ -33,32 +43,36 @@ type StatusInfo struct { //Wallet is related to storing account unspent outputs type Wallet struct { - DB db.DB - rw sync.RWMutex - status StatusInfo - AccountMgr *account.Manager - AssetReg *asset.Registry - Hsm *pseudohsm.HSM - chain *protocol.Chain - RecoveryMgr *recoveryManager - rescanCh chan struct{} - dposAddress common.Address + Store WalletStore + rw sync.RWMutex + Status StatusInfo + TxIndexFlag bool + AccountMgr *account.Manager + AssetReg *asset.Registry + Hsm *pseudohsm.HSM + Chain *protocol.Chain + RecoveryMgr *recoveryManager + EventDispatcher *event.Dispatcher + TxMsgSub *event.Subscription + + rescanCh chan struct{} } //NewWallet return a new wallet instance -func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dposAddress common.Address) (*Wallet, error) { +func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) { w := &Wallet{ - DB: walletDB, - AccountMgr: account, - AssetReg: asset, - chain: chain, - Hsm: hsm, - RecoveryMgr: newRecoveryManager(walletDB, account), - rescanCh: make(chan struct{}, 1), - dposAddress: dposAddress, + Store: store, + AccountMgr: account, + AssetReg: asset, + Chain: chain, + Hsm: hsm, + RecoveryMgr: NewRecoveryManager(store, account), + EventDispatcher: dispatcher, + rescanCh: make(chan struct{}, 1), + TxIndexFlag: txIndexFlag, } - if err := w.loadWalletInfo(); err != nil { + if err := w.LoadWalletInfo(); err != nil { return nil, err } @@ -66,34 +80,91 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, return nil, err } + var err error + w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{}) + if err != nil { + return nil, err + } + go w.walletUpdater() go w.delUnconfirmedTx() + go w.MemPoolTxQueryLoop() return w, nil } -//GetWalletInfo return stored wallet info and nil,if error, -//return initial wallet info and err -func (w *Wallet) loadWalletInfo() error { - if rawWallet := w.DB.Get(walletKey); rawWallet != nil { - return json.Unmarshal(rawWallet, &w.status) +// MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet. +func (w *Wallet) MemPoolTxQueryLoop() { + for { + select { + case obj, ok := <-w.TxMsgSub.Chan(): + if !ok { + log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed") + return + } + + ev, ok := obj.Data.(protocol.TxMsgEvent) + if !ok { + log.WithFields(log.Fields{"module": logModule}).Error("event type error") + continue + } + + switch ev.TxMsg.MsgType { + case protocol.MsgNewTx: + w.AddUnconfirmedTx(ev.TxMsg.TxDesc) + case protocol.MsgRemoveTx: + w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc) + default: + log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel") + } + } } +} - block, err := w.chain.GetBlockByHeight(0) +func (w *Wallet) checkWalletInfo() error { + if w.Status.Version != currentVersion { + return errWalletVersionMismatch + } else if !w.Chain.BlockExist(&w.Status.BestHash) { + return errBestBlockNotFoundInCore + } + + return nil +} + +//LoadWalletInfo return stored wallet info and nil, +//if error, return initial wallet info and err +func (w *Wallet) LoadWalletInfo() error { + walletStatus, err := w.Store.GetWalletInfo() + if walletStatus == nil && err != ErrGetWalletStatusInfo { + return err + } + + if walletStatus != nil { + w.Status = *walletStatus + err = w.checkWalletInfo() + if err == nil { + return nil + } + + log.WithFields(log.Fields{"module": logModule}).Warn(err.Error()) + w.Store.DeleteWalletTransactions() + w.Store.DeleteWalletUTXOs() + } + + w.Status.Version = currentVersion + w.Status.WorkHash = bc.Hash{} + block, err := w.Chain.GetBlockByHeight(0) if err != nil { return err } + return w.AttachBlock(block) } -func (w *Wallet) commitWalletInfo(batch db.Batch) error { - rawWallet, err := json.Marshal(w.status) - if err != nil { - log.WithField("err", err).Error("save wallet info") +func (w *Wallet) commitWalletInfo(store WalletStore) error { + if err := store.SetWalletInfo(&w.Status); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info") return err } - - batch.Set(walletKey, rawWallet) - batch.Write() return nil } @@ -102,34 +173,51 @@ func (w *Wallet) AttachBlock(block *types.Block) error { w.rw.Lock() defer w.rw.Unlock() - if block.PreviousBlockHash != w.status.WorkHash { + if block.PreviousBlockHash != w.Status.WorkHash { log.Warn("wallet skip attachBlock due to status hash not equal to previous hash") return nil } blockHash := block.Hash() - txStatus, err := w.chain.GetTransactionStatus(&blockHash) + txStatus, err := w.Chain.GetTransactionStatus(&blockHash) if err != nil { return err } if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil { + log.WithField("err", err).Error("filter recovery txs") + w.RecoveryMgr.finished() + } + + annotatedTxs := w.filterAccountTxs(block, txStatus) + if err := saveExternalAssetDefinition(block, w.Store); err != nil { + return err + } + + w.annotateTxsAccount(annotatedTxs) + + newStore := w.Store.InitBatch() + if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil { return err } - storeBatch := w.DB.NewBatch() - if err := w.indexTransactions(storeBatch, block, txStatus); err != nil { + w.attachUtxos(block, txStatus, newStore) + w.Status.WorkHeight = block.Height + w.Status.WorkHash = block.Hash() + if w.Status.WorkHeight >= w.Status.BestHeight { + w.Status.BestHeight = w.Status.WorkHeight + w.Status.BestHash = w.Status.WorkHash + } + + if err := w.commitWalletInfo(newStore); err != nil { return err } - w.attachUtxos(storeBatch, block, txStatus) - w.status.WorkHeight = block.Height - w.status.WorkHash = block.Hash() - if w.status.WorkHeight >= w.status.BestHeight { - w.status.BestHeight = w.status.WorkHeight - w.status.BestHash = w.status.WorkHash + if err := newStore.CommitBatch(); err != nil { + return err } - return w.commitWalletInfo(storeBatch) + + return nil } // DetachBlock detach a block and rollback state @@ -138,51 +226,59 @@ func (w *Wallet) DetachBlock(block *types.Block) error { defer w.rw.Unlock() blockHash := block.Hash() - txStatus, err := w.chain.GetTransactionStatus(&blockHash) + txStatus, err := w.Chain.GetTransactionStatus(&blockHash) if err != nil { return err } - storeBatch := w.DB.NewBatch() - w.detachUtxos(storeBatch, block, txStatus) - w.deleteTransactions(storeBatch, w.status.BestHeight) + newStore := w.Store.InitBatch() + + w.detachUtxos(block, txStatus, newStore) + newStore.DeleteTransactions(w.Status.BestHeight) + + w.Status.BestHeight = block.Height - 1 + w.Status.BestHash = block.PreviousBlockHash - w.status.BestHeight = block.Height - 1 - w.status.BestHash = block.PreviousBlockHash + if w.Status.WorkHeight > w.Status.BestHeight { + w.Status.WorkHeight = w.Status.BestHeight + w.Status.WorkHash = w.Status.BestHash + } + if err := w.commitWalletInfo(newStore); err != nil { + return err + } - if w.status.WorkHeight > w.status.BestHeight { - w.status.WorkHeight = w.status.BestHeight - w.status.WorkHash = w.status.BestHash + if err := newStore.CommitBatch(); err != nil { + return err } - return w.commitWalletInfo(storeBatch) + return nil } //WalletUpdate process every valid block and reverse every invalid block which need to rollback func (w *Wallet) walletUpdater() { for { w.getRescanNotification() - for !w.chain.InMainChain(w.status.BestHash) { - block, err := w.chain.GetBlockByHash(&w.status.BestHash) + for !w.Chain.InMainChain(w.Status.BestHash) { + block, err := w.Chain.GetBlockByHash(&w.Status.BestHash) if err != nil { - log.WithField("err", err).Error("walletUpdater GetBlockByHash") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash") return } if err := w.DetachBlock(block); err != nil { - log.WithField("err", err).Error("walletUpdater detachBlock stop") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop") return } } - block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1) + block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1) if block == nil { w.walletBlockWaiter() continue } if err := w.AttachBlock(block); err != nil { - log.WithField("err", err).Error("walletUpdater AttachBlock stop") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop") return } } @@ -197,27 +293,6 @@ func (w *Wallet) RescanBlocks() { } } -// deleteAccountTxs deletes all txs in wallet -func (w *Wallet) deleteAccountTxs() { - storeBatch := w.DB.NewBatch() - - txIter := w.DB.IteratorPrefix([]byte(TxPrefix)) - defer txIter.Release() - - for txIter.Next() { - storeBatch.Delete(txIter.Key()) - } - - txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix)) - defer txIndexIter.Release() - - for txIndexIter.Next() { - storeBatch.Delete(txIndexIter.Key()) - } - - storeBatch.Write() -} - // DeleteAccount deletes account matching accountID, then rescan wallet func (w *Wallet) DeleteAccount(accountID string) (err error) { w.rw.Lock() @@ -227,7 +302,7 @@ func (w *Wallet) DeleteAccount(accountID string) (err error) { return err } - w.deleteAccountTxs() + w.Store.DeleteWalletTransactions() w.RescanBlocks() return nil } @@ -240,7 +315,7 @@ func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err erro return err } - w.deleteAccountTxs() + w.Store.DeleteWalletTransactions() w.RescanBlocks() return nil } @@ -255,14 +330,14 @@ func (w *Wallet) getRescanNotification() { } func (w *Wallet) setRescanStatus() { - block, _ := w.chain.GetBlockByHeight(0) - w.status.WorkHash = bc.Hash{} + block, _ := w.Chain.GetBlockByHeight(0) + w.Status.WorkHash = bc.Hash{} w.AttachBlock(block) } func (w *Wallet) walletBlockWaiter() { select { - case <-w.chain.BlockWaiter(w.status.WorkHeight + 1): + case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1): case <-w.rescanCh: w.setRescanStatus() } @@ -273,5 +348,5 @@ func (w *Wallet) GetWalletStatusInfo() StatusInfo { w.rw.RLock() defer w.rw.RUnlock() - return w.status + return w.Status }