7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/bytom/account"
10 "github.com/bytom/bytom/asset"
11 "github.com/bytom/bytom/blockchain/pseudohsm"
12 "github.com/bytom/bytom/contract"
13 dbm "github.com/bytom/bytom/database/leveldb"
14 "github.com/bytom/bytom/errors"
15 "github.com/bytom/bytom/event"
16 "github.com/bytom/bytom/protocol"
17 "github.com/bytom/bytom/protocol/bc"
18 "github.com/bytom/bytom/protocol/bc/types"
28 currentVersion = uint(1)
29 walletKey = []byte("walletInfo")
31 errBestBlockNotFoundInCore = errors.New("best block not found in core")
32 errWalletVersionMismatch = errors.New("wallet version mismatch")
35 //StatusInfo is base valid block info to handle orphan block rollback
36 type StatusInfo struct {
44 //Wallet is related to storing account unspent outputs
50 AccountMgr *account.Manager
51 AssetReg *asset.Registry
52 ContractReg *contract.Registry
55 RecoveryMgr *recoveryManager
56 eventDispatcher *event.Dispatcher
57 txMsgSub *event.Subscription
59 rescanCh chan struct{}
62 //NewWallet return a new wallet instance
63 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, contract *contract.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
68 ContractReg: contract,
71 RecoveryMgr: newRecoveryManager(walletDB, account),
72 eventDispatcher: dispatcher,
73 rescanCh: make(chan struct{}, 1),
74 TxIndexFlag: txIndexFlag,
77 if err := w.loadWalletInfo(); err != nil {
81 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
86 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
92 go w.delUnconfirmedTx()
93 go w.memPoolTxQueryLoop()
97 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
98 func (w *Wallet) memPoolTxQueryLoop() {
101 case obj, ok := <-w.txMsgSub.Chan():
103 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
107 ev, ok := obj.Data.(protocol.TxMsgEvent)
109 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
113 switch ev.TxMsg.MsgType {
114 case protocol.MsgNewTx:
115 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
116 case protocol.MsgRemoveTx:
117 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
119 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
125 func (w *Wallet) checkWalletInfo() error {
126 if w.status.Version != currentVersion {
127 return errWalletVersionMismatch
128 } else if !w.chain.BlockExist(&w.status.BestHash) {
129 return errBestBlockNotFoundInCore
135 //loadWalletInfo return stored wallet info and nil,
136 //if error, return initial wallet info and err
137 func (w *Wallet) loadWalletInfo() error {
138 if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
139 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
143 err := w.checkWalletInfo()
148 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
153 w.status.Version = currentVersion
154 w.status.WorkHash = bc.Hash{}
155 block, err := w.chain.GetBlockByHeight(0)
159 return w.AttachBlock(block)
162 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
163 rawWallet, err := json.Marshal(w.status)
165 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
169 batch.Set(walletKey, rawWallet)
174 // AttachBlock attach a new block
175 func (w *Wallet) AttachBlock(block *types.Block) error {
179 if block.PreviousBlockHash != w.status.WorkHash {
180 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
184 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185 log.WithField("err", err).Error("filter recovery txs")
186 w.RecoveryMgr.finished()
189 storeBatch := w.DB.NewBatch()
190 if err := w.indexTransactions(storeBatch, block); err != nil {
194 w.attachUtxos(storeBatch, block)
195 w.status.WorkHeight = block.Height
196 w.status.WorkHash = block.Hash()
197 if w.status.WorkHeight >= w.status.BestHeight {
198 w.status.BestHeight = w.status.WorkHeight
199 w.status.BestHash = w.status.WorkHash
201 return w.commitWalletInfo(storeBatch)
204 // DetachBlock detach a block and rollback state
205 func (w *Wallet) DetachBlock(block *types.Block) error {
209 storeBatch := w.DB.NewBatch()
210 w.detachUtxos(storeBatch, block)
211 w.deleteTransactions(storeBatch, w.status.BestHeight)
213 w.status.BestHeight = block.Height - 1
214 w.status.BestHash = block.PreviousBlockHash
216 if w.status.WorkHeight > w.status.BestHeight {
217 w.status.WorkHeight = w.status.BestHeight
218 w.status.WorkHash = w.status.BestHash
221 return w.commitWalletInfo(storeBatch)
224 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
225 func (w *Wallet) walletUpdater() {
227 w.getRescanNotification()
228 for !w.chain.InMainChain(w.status.BestHash) {
229 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
231 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
235 if err := w.DetachBlock(block); err != nil {
236 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
241 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
243 w.walletBlockWaiter()
247 if err := w.AttachBlock(block); err != nil {
248 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
254 //RescanBlocks provide a trigger to rescan blocks
255 func (w *Wallet) RescanBlocks() {
257 case w.rescanCh <- struct{}{}:
263 // deleteAccountTxs deletes all txs in wallet
264 func (w *Wallet) deleteAccountTxs() {
265 storeBatch := w.DB.NewBatch()
267 txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
268 defer txIter.Release()
271 storeBatch.Delete(txIter.Key())
274 txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
275 defer txIndexIter.Release()
277 for txIndexIter.Next() {
278 storeBatch.Delete(txIndexIter.Key())
284 func (w *Wallet) deleteUtxos() {
285 storeBatch := w.DB.NewBatch()
286 ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
287 defer ruIter.Release()
289 storeBatch.Delete(ruIter.Key())
292 suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
293 defer suIter.Release()
295 storeBatch.Delete(suIter.Key())
300 // DeleteAccount deletes account matching accountID, then rescan wallet
301 func (w *Wallet) DeleteAccount(accountID string) (err error) {
305 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
314 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
318 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
327 func (w *Wallet) getRescanNotification() {
336 func (w *Wallet) setRescanStatus() {
337 block, _ := w.chain.GetBlockByHeight(0)
338 w.status.WorkHash = bc.Hash{}
342 func (w *Wallet) walletBlockWaiter() {
344 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
350 // GetWalletStatusInfo return current wallet StatusInfo
351 func (w *Wallet) GetWalletStatusInfo() StatusInfo {