7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/account"
10 "github.com/vapor/asset"
11 "github.com/vapor/blockchain/pseudohsm"
12 dbm "github.com/vapor/database/leveldb"
13 "github.com/vapor/errors"
14 "github.com/vapor/event"
15 "github.com/vapor/protocol"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
27 currentVersion = uint(1)
28 walletKey = []byte("walletInfo")
30 errBestBlockNotFoundInCore = errors.New("best block not found in core")
31 errWalletVersionMismatch = errors.New("wallet version mismatch")
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
43 //Wallet is related to storing account unspent outputs
49 AccountMgr *account.Manager
50 AssetReg *asset.Registry
53 RecoveryMgr *recoveryManager
54 eventDispatcher *event.Dispatcher
55 txMsgSub *event.Subscription
57 rescanCh chan struct{}
60 //NewWallet return a new wallet instance
61 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
68 RecoveryMgr: newRecoveryManager(walletDB, account),
69 eventDispatcher: dispatcher,
70 rescanCh: make(chan struct{}, 1),
71 TxIndexFlag: txIndexFlag,
74 if err := w.loadWalletInfo(); err != nil {
78 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
83 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
89 go w.delUnconfirmedTx()
90 go w.memPoolTxQueryLoop()
94 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) memPoolTxQueryLoop() {
98 case obj, ok := <-w.txMsgSub.Chan():
100 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
104 ev, ok := obj.Data.(protocol.TxMsgEvent)
106 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
110 switch ev.TxMsg.MsgType {
111 case protocol.MsgNewTx:
112 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113 case protocol.MsgRemoveTx:
114 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
116 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
122 func (w *Wallet) checkWalletInfo() error {
123 if w.status.Version != currentVersion {
124 return errWalletVersionMismatch
125 } else if !w.chain.BlockExist(&w.status.BestHash) {
126 return errBestBlockNotFoundInCore
132 //loadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) loadWalletInfo() error {
135 if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
136 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
140 err := w.checkWalletInfo()
145 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
150 w.status.Version = currentVersion
151 w.status.WorkHash = bc.Hash{}
152 block, err := w.chain.GetBlockByHeight(0)
156 return w.AttachBlock(block)
159 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
160 rawWallet, err := json.Marshal(w.status)
162 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
166 batch.Set(walletKey, rawWallet)
171 // AttachBlock attach a new block
172 func (w *Wallet) AttachBlock(block *types.Block) error {
176 if block.PreviousBlockHash != w.status.WorkHash {
177 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
181 blockHash := block.Hash()
182 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
187 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
188 log.WithField("err", err).Error("filter recovery txs")
189 w.RecoveryMgr.finished()
192 storeBatch := w.DB.NewBatch()
193 if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
197 w.attachUtxos(storeBatch, block, txStatus)
198 w.status.WorkHeight = block.Height
199 w.status.WorkHash = block.Hash()
200 if w.status.WorkHeight >= w.status.BestHeight {
201 w.status.BestHeight = w.status.WorkHeight
202 w.status.BestHash = w.status.WorkHash
204 return w.commitWalletInfo(storeBatch)
207 // DetachBlock detach a block and rollback state
208 func (w *Wallet) DetachBlock(block *types.Block) error {
212 blockHash := block.Hash()
213 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
218 storeBatch := w.DB.NewBatch()
219 w.detachUtxos(storeBatch, block, txStatus)
220 w.deleteTransactions(storeBatch, w.status.BestHeight)
222 w.status.BestHeight = block.Height - 1
223 w.status.BestHash = block.PreviousBlockHash
225 if w.status.WorkHeight > w.status.BestHeight {
226 w.status.WorkHeight = w.status.BestHeight
227 w.status.WorkHash = w.status.BestHash
230 return w.commitWalletInfo(storeBatch)
233 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
234 func (w *Wallet) walletUpdater() {
236 w.getRescanNotification()
237 for !w.chain.InMainChain(w.status.BestHash) {
238 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
240 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
244 if err := w.DetachBlock(block); err != nil {
245 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
250 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
252 w.walletBlockWaiter()
256 if err := w.AttachBlock(block); err != nil {
257 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
263 //RescanBlocks provide a trigger to rescan blocks
264 func (w *Wallet) RescanBlocks() {
266 case w.rescanCh <- struct{}{}:
272 // deleteAccountTxs deletes all txs in wallet
273 func (w *Wallet) deleteAccountTxs() {
274 storeBatch := w.DB.NewBatch()
276 txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
277 defer txIter.Release()
280 storeBatch.Delete(txIter.Key())
283 txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
284 defer txIndexIter.Release()
286 for txIndexIter.Next() {
287 storeBatch.Delete(txIndexIter.Key())
293 func (w *Wallet) deleteUtxos() {
294 storeBatch := w.DB.NewBatch()
295 ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
296 defer ruIter.Release()
298 storeBatch.Delete(ruIter.Key())
301 suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
302 defer suIter.Release()
304 storeBatch.Delete(suIter.Key())
309 // DeleteAccount deletes account matching accountID, then rescan wallet
310 func (w *Wallet) DeleteAccount(accountID string) (err error) {
314 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
323 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
327 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
336 func (w *Wallet) getRescanNotification() {
345 func (w *Wallet) setRescanStatus() {
346 block, _ := w.chain.GetBlockByHeight(0)
347 w.status.WorkHash = bc.Hash{}
351 func (w *Wallet) walletBlockWaiter() {
353 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
359 // GetWalletStatusInfo return current wallet StatusInfo
360 func (w *Wallet) GetWalletStatusInfo() StatusInfo {