7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/account"
10 "github.com/bytom/asset"
11 "github.com/bytom/blockchain/pseudohsm"
12 dbm "github.com/bytom/database/leveldb"
13 "github.com/bytom/errors"
14 "github.com/bytom/event"
15 "github.com/bytom/protocol"
16 "github.com/bytom/protocol/bc"
17 "github.com/bytom/protocol/bc/types"
27 currentVersion = uint(1)
28 walletKey = []byte("walletInfo")
30 errBestBlockNotFoundInCore = errors.New("best block not found in core")
31 errWalletVersionMismatch = errors.New("wallet version mismatch")
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
43 //Wallet is related to storing account unspent outputs
49 AccountMgr *account.Manager
50 AssetReg *asset.Registry
53 RecoveryMgr *recoveryManager
54 eventDispatcher *event.Dispatcher
55 txMsgSub *event.Subscription
57 rescanCh chan struct{}
60 //NewWallet return a new wallet instance
61 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
68 RecoveryMgr: newRecoveryManager(walletDB, account),
69 eventDispatcher: dispatcher,
70 rescanCh: make(chan struct{}, 1),
71 TxIndexFlag: txIndexFlag,
74 if err := w.loadWalletInfo(); err != nil {
78 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
83 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
89 go w.delUnconfirmedTx()
90 go w.memPoolTxQueryLoop()
94 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) memPoolTxQueryLoop() {
98 case obj, ok := <-w.txMsgSub.Chan():
100 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
104 ev, ok := obj.Data.(protocol.TxMsgEvent)
106 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
110 switch ev.TxMsg.MsgType {
111 case protocol.MsgNewTx:
112 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113 case protocol.MsgRemoveTx:
114 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
116 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
122 func (w *Wallet) checkWalletInfo() error {
123 if w.status.Version != currentVersion {
124 return errWalletVersionMismatch
125 } else if !w.chain.BlockExist(&w.status.BestHash) {
126 return errBestBlockNotFoundInCore
132 //loadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) loadWalletInfo() error {
135 if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
136 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
140 err := w.checkWalletInfo()
145 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
150 w.status.Version = currentVersion
151 block, err := w.chain.GetBlockByHeight(0)
155 return w.AttachBlock(block)
158 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
159 rawWallet, err := json.Marshal(w.status)
161 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
165 batch.Set(walletKey, rawWallet)
170 // AttachBlock attach a new block
171 func (w *Wallet) AttachBlock(block *types.Block) error {
175 if block.PreviousBlockHash != w.status.WorkHash {
176 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
180 blockHash := block.Hash()
181 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
186 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
190 storeBatch := w.DB.NewBatch()
191 if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
195 w.attachUtxos(storeBatch, block, txStatus)
196 w.status.WorkHeight = block.Height
197 w.status.WorkHash = block.Hash()
198 if w.status.WorkHeight >= w.status.BestHeight {
199 w.status.BestHeight = w.status.WorkHeight
200 w.status.BestHash = w.status.WorkHash
202 return w.commitWalletInfo(storeBatch)
205 // DetachBlock detach a block and rollback state
206 func (w *Wallet) DetachBlock(block *types.Block) error {
210 blockHash := block.Hash()
211 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
216 storeBatch := w.DB.NewBatch()
217 w.detachUtxos(storeBatch, block, txStatus)
218 w.deleteTransactions(storeBatch, w.status.BestHeight)
220 w.status.BestHeight = block.Height - 1
221 w.status.BestHash = block.PreviousBlockHash
223 if w.status.WorkHeight > w.status.BestHeight {
224 w.status.WorkHeight = w.status.BestHeight
225 w.status.WorkHash = w.status.BestHash
228 return w.commitWalletInfo(storeBatch)
231 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
232 func (w *Wallet) walletUpdater() {
234 w.getRescanNotification()
235 for !w.chain.InMainChain(w.status.BestHash) {
236 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
238 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
242 if err := w.DetachBlock(block); err != nil {
243 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
248 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
250 w.walletBlockWaiter()
254 if err := w.AttachBlock(block); err != nil {
255 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
261 //RescanBlocks provide a trigger to rescan blocks
262 func (w *Wallet) RescanBlocks() {
264 case w.rescanCh <- struct{}{}:
270 // deleteAccountTxs deletes all txs in wallet
271 func (w *Wallet) deleteAccountTxs() {
272 storeBatch := w.DB.NewBatch()
274 txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
275 defer txIter.Release()
278 storeBatch.Delete(txIter.Key())
281 txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
282 defer txIndexIter.Release()
284 for txIndexIter.Next() {
285 storeBatch.Delete(txIndexIter.Key())
291 func (w *Wallet) deleteUtxos() {
292 storeBatch := w.DB.NewBatch()
293 ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
294 defer ruIter.Release()
296 storeBatch.Delete(ruIter.Key())
299 suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
300 defer suIter.Release()
302 storeBatch.Delete(suIter.Key())
307 // DeleteAccount deletes account matching accountID, then rescan wallet
308 func (w *Wallet) DeleteAccount(accountID string) (err error) {
312 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
321 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
325 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
334 func (w *Wallet) getRescanNotification() {
343 func (w *Wallet) setRescanStatus() {
344 block, _ := w.chain.GetBlockByHeight(0)
345 w.status.WorkHash = bc.Hash{}
349 func (w *Wallet) walletBlockWaiter() {
351 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
357 // GetWalletStatusInfo return current wallet StatusInfo
358 func (w *Wallet) GetWalletStatusInfo() StatusInfo {