package wallet
import (
- "encoding/json"
"sync"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/tmlibs/db"
"github.com/vapor/account"
"github.com/vapor/asset"
"github.com/vapor/blockchain/pseudohsm"
- "github.com/vapor/common"
+ "github.com/vapor/errors"
+ "github.com/vapor/event"
"github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
const (
//SINGLE single sign
- SINGLE = 1
+ SINGLE = 1
+ logModule = "wallet"
)
-var walletKey = []byte("walletInfo")
+var (
+ currentVersion = uint(1)
+
+ errBestBlockNotFoundInCore = errors.New("best block not found in core")
+ errWalletVersionMismatch = errors.New("wallet version mismatch")
+ ErrGetWalletStatusInfo = errors.New("failed get wallet info")
+ ErrGetAsset = errors.New("Failed to find asset definition")
+ ErrAccntTxIDNotFound = errors.New("account TXID not found")
+ ErrGetStandardUTXO = errors.New("failed get standard UTXO")
+)
//StatusInfo is base valid block info to handle orphan block rollback
type StatusInfo struct {
+ Version uint
WorkHeight uint64
WorkHash bc.Hash
BestHeight uint64
//Wallet is related to storing account unspent outputs
type Wallet struct {
- DB db.DB
- rw sync.RWMutex
- status StatusInfo
- AccountMgr *account.Manager
- AssetReg *asset.Registry
- Hsm *pseudohsm.HSM
- chain *protocol.Chain
- RecoveryMgr *recoveryManager
- rescanCh chan struct{}
- dposAddress common.Address
+ Store WalletStore
+ rw sync.RWMutex
+ Status StatusInfo
+ TxIndexFlag bool
+ AccountMgr *account.Manager
+ AssetReg *asset.Registry
+ Hsm *pseudohsm.HSM
+ Chain *protocol.Chain
+ RecoveryMgr *recoveryManager
+ EventDispatcher *event.Dispatcher
+ TxMsgSub *event.Subscription
+
+ rescanCh chan struct{}
}
//NewWallet return a new wallet instance
-func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dposAddress common.Address) (*Wallet, error) {
+func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
w := &Wallet{
- DB: walletDB,
- AccountMgr: account,
- AssetReg: asset,
- chain: chain,
- Hsm: hsm,
- RecoveryMgr: newRecoveryManager(walletDB, account),
- rescanCh: make(chan struct{}, 1),
- dposAddress: dposAddress,
+ Store: store,
+ AccountMgr: account,
+ AssetReg: asset,
+ Chain: chain,
+ Hsm: hsm,
+ RecoveryMgr: NewRecoveryManager(store, account),
+ EventDispatcher: dispatcher,
+ rescanCh: make(chan struct{}, 1),
+ TxIndexFlag: txIndexFlag,
}
- if err := w.loadWalletInfo(); err != nil {
+ if err := w.LoadWalletInfo(); err != nil {
return nil, err
}
return nil, err
}
+ var err error
+ w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
+ if err != nil {
+ return nil, err
+ }
+
go w.walletUpdater()
go w.delUnconfirmedTx()
+ go w.MemPoolTxQueryLoop()
return w, nil
}
-//GetWalletInfo return stored wallet info and nil,if error,
-//return initial wallet info and err
-func (w *Wallet) loadWalletInfo() error {
- if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
- return json.Unmarshal(rawWallet, &w.status)
+// MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
+func (w *Wallet) MemPoolTxQueryLoop() {
+ for {
+ select {
+ case obj, ok := <-w.TxMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+ return
+ }
+
+ ev, ok := obj.Data.(protocol.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ switch ev.TxMsg.MsgType {
+ case protocol.MsgNewTx:
+ w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
+ case protocol.MsgRemoveTx:
+ w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
+ default:
+ log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
+ }
+ }
}
+}
- block, err := w.chain.GetBlockByHeight(0)
+func (w *Wallet) checkWalletInfo() error {
+ if w.Status.Version != currentVersion {
+ return errWalletVersionMismatch
+ } else if !w.Chain.BlockExist(&w.Status.BestHash) {
+ return errBestBlockNotFoundInCore
+ }
+
+ return nil
+}
+
+//LoadWalletInfo return stored wallet info and nil,
+//if error, return initial wallet info and err
+func (w *Wallet) LoadWalletInfo() error {
+ walletStatus, err := w.Store.GetWalletInfo()
+ if walletStatus == nil && err != ErrGetWalletStatusInfo {
+ return err
+ }
+
+ if walletStatus != nil {
+ w.Status = *walletStatus
+ err = w.checkWalletInfo()
+ if err == nil {
+ return nil
+ }
+
+ log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
+ w.Store.DeleteWalletTransactions()
+ w.Store.DeleteWalletUTXOs()
+ }
+
+ w.Status.Version = currentVersion
+ w.Status.WorkHash = bc.Hash{}
+ block, err := w.Chain.GetBlockByHeight(0)
if err != nil {
return err
}
+
return w.AttachBlock(block)
}
-func (w *Wallet) commitWalletInfo(batch db.Batch) error {
- rawWallet, err := json.Marshal(w.status)
- if err != nil {
- log.WithField("err", err).Error("save wallet info")
+func (w *Wallet) commitWalletInfo(store WalletStore) error {
+ if err := store.SetWalletInfo(&w.Status); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
return err
}
-
- batch.Set(walletKey, rawWallet)
- batch.Write()
return nil
}
w.rw.Lock()
defer w.rw.Unlock()
- if block.PreviousBlockHash != w.status.WorkHash {
+ if block.PreviousBlockHash != w.Status.WorkHash {
log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
return nil
}
blockHash := block.Hash()
- txStatus, err := w.chain.GetTransactionStatus(&blockHash)
+ txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
if err != nil {
return err
}
if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
+ log.WithField("err", err).Error("filter recovery txs")
+ w.RecoveryMgr.finished()
+ }
+
+ annotatedTxs := w.filterAccountTxs(block, txStatus)
+ if err := saveExternalAssetDefinition(block, w.Store); err != nil {
+ return err
+ }
+
+ w.annotateTxsAccount(annotatedTxs)
+
+ newStore := w.Store.InitBatch()
+ if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
return err
}
- storeBatch := w.DB.NewBatch()
- if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
+ w.attachUtxos(block, txStatus, newStore)
+ w.Status.WorkHeight = block.Height
+ w.Status.WorkHash = block.Hash()
+ if w.Status.WorkHeight >= w.Status.BestHeight {
+ w.Status.BestHeight = w.Status.WorkHeight
+ w.Status.BestHash = w.Status.WorkHash
+ }
+
+ if err := w.commitWalletInfo(newStore); err != nil {
return err
}
- w.attachUtxos(storeBatch, block, txStatus)
- w.status.WorkHeight = block.Height
- w.status.WorkHash = block.Hash()
- if w.status.WorkHeight >= w.status.BestHeight {
- w.status.BestHeight = w.status.WorkHeight
- w.status.BestHash = w.status.WorkHash
+ if err := newStore.CommitBatch(); err != nil {
+ return err
}
- return w.commitWalletInfo(storeBatch)
+
+ return nil
}
// DetachBlock detach a block and rollback state
defer w.rw.Unlock()
blockHash := block.Hash()
- txStatus, err := w.chain.GetTransactionStatus(&blockHash)
+ txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
if err != nil {
return err
}
- storeBatch := w.DB.NewBatch()
- w.detachUtxos(storeBatch, block, txStatus)
- w.deleteTransactions(storeBatch, w.status.BestHeight)
+ newStore := w.Store.InitBatch()
+
+ w.detachUtxos(block, txStatus, newStore)
+ newStore.DeleteTransactions(w.Status.BestHeight)
+
+ w.Status.BestHeight = block.Height - 1
+ w.Status.BestHash = block.PreviousBlockHash
- w.status.BestHeight = block.Height - 1
- w.status.BestHash = block.PreviousBlockHash
+ if w.Status.WorkHeight > w.Status.BestHeight {
+ w.Status.WorkHeight = w.Status.BestHeight
+ w.Status.WorkHash = w.Status.BestHash
+ }
+ if err := w.commitWalletInfo(newStore); err != nil {
+ return err
+ }
- if w.status.WorkHeight > w.status.BestHeight {
- w.status.WorkHeight = w.status.BestHeight
- w.status.WorkHash = w.status.BestHash
+ if err := newStore.CommitBatch(); err != nil {
+ return err
}
- return w.commitWalletInfo(storeBatch)
+ return nil
}
//WalletUpdate process every valid block and reverse every invalid block which need to rollback
func (w *Wallet) walletUpdater() {
for {
w.getRescanNotification()
- for !w.chain.InMainChain(w.status.BestHash) {
- block, err := w.chain.GetBlockByHash(&w.status.BestHash)
+ for !w.Chain.InMainChain(w.Status.BestHash) {
+ block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
if err != nil {
- log.WithField("err", err).Error("walletUpdater GetBlockByHash")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
return
}
if err := w.DetachBlock(block); err != nil {
- log.WithField("err", err).Error("walletUpdater detachBlock stop")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
return
}
}
- block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
+ block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
if block == nil {
w.walletBlockWaiter()
continue
}
if err := w.AttachBlock(block); err != nil {
- log.WithField("err", err).Error("walletUpdater AttachBlock stop")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
return
}
}
}
}
-// deleteAccountTxs deletes all txs in wallet
-func (w *Wallet) deleteAccountTxs() {
- storeBatch := w.DB.NewBatch()
-
- txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
- defer txIter.Release()
-
- for txIter.Next() {
- storeBatch.Delete(txIter.Key())
- }
-
- txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
- defer txIndexIter.Release()
-
- for txIndexIter.Next() {
- storeBatch.Delete(txIndexIter.Key())
- }
-
- storeBatch.Write()
-}
-
// DeleteAccount deletes account matching accountID, then rescan wallet
func (w *Wallet) DeleteAccount(accountID string) (err error) {
w.rw.Lock()
return err
}
- w.deleteAccountTxs()
+ w.Store.DeleteWalletTransactions()
w.RescanBlocks()
return nil
}
return err
}
- w.deleteAccountTxs()
+ w.Store.DeleteWalletTransactions()
w.RescanBlocks()
return nil
}
}
func (w *Wallet) setRescanStatus() {
- block, _ := w.chain.GetBlockByHeight(0)
- w.status.WorkHash = bc.Hash{}
+ block, _ := w.Chain.GetBlockByHeight(0)
+ w.Status.WorkHash = bc.Hash{}
w.AttachBlock(block)
}
func (w *Wallet) walletBlockWaiter() {
select {
- case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
+ case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
case <-w.rescanCh:
w.setRescanStatus()
}
w.rw.RLock()
defer w.rw.RUnlock()
- return w.status
+ return w.Status
}