7 log "github.com/sirupsen/logrus"
8 "github.com/tendermint/tmlibs/db"
10 "github.com/bytom/account"
11 "github.com/bytom/asset"
12 "github.com/bytom/blockchain/pseudohsm"
13 "github.com/bytom/event"
14 "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/protocol/bc/types"
25 var walletKey = []byte("walletInfo")
27 //StatusInfo is base valid block info to handle orphan block rollback
28 type StatusInfo struct {
35 //Wallet is related to storing account unspent outputs
40 AccountMgr *account.Manager
41 AssetReg *asset.Registry
44 RecoveryMgr *recoveryManager
45 eventDispatcher *event.Dispatcher
46 txMsgSub *event.Subscription
48 rescanCh chan struct{}
51 //NewWallet return a new wallet instance
52 func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher) (*Wallet, error) {
59 RecoveryMgr: newRecoveryManager(walletDB, account),
60 eventDispatcher: dispatcher,
61 rescanCh: make(chan struct{}, 1),
64 if err := w.loadWalletInfo(); err != nil {
68 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
73 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
79 go w.delUnconfirmedTx()
80 go w.memPoolTxQueryLoop()
84 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
85 func (w *Wallet) memPoolTxQueryLoop() {
88 case obj, ok := <-w.txMsgSub.Chan():
90 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
94 ev, ok := obj.Data.(protocol.TxMsgEvent)
96 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
100 switch ev.TxMsg.MsgType {
101 case protocol.MsgNewTx:
102 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
103 case protocol.MsgRemoveTx:
104 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
106 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
112 //GetWalletInfo return stored wallet info and nil,if error,
113 //return initial wallet info and err
114 func (w *Wallet) loadWalletInfo() error {
115 if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
116 return json.Unmarshal(rawWallet, &w.status)
119 block, err := w.chain.GetBlockByHeight(0)
123 return w.AttachBlock(block)
126 func (w *Wallet) commitWalletInfo(batch db.Batch) error {
127 rawWallet, err := json.Marshal(w.status)
129 log.WithField("err", err).Error("save wallet info")
133 batch.Set(walletKey, rawWallet)
138 // AttachBlock attach a new block
139 func (w *Wallet) AttachBlock(block *types.Block) error {
143 if block.PreviousBlockHash != w.status.WorkHash {
144 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
148 blockHash := block.Hash()
149 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
154 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
158 storeBatch := w.DB.NewBatch()
159 if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
163 w.attachUtxos(storeBatch, block, txStatus)
164 w.status.WorkHeight = block.Height
165 w.status.WorkHash = block.Hash()
166 if w.status.WorkHeight >= w.status.BestHeight {
167 w.status.BestHeight = w.status.WorkHeight
168 w.status.BestHash = w.status.WorkHash
170 return w.commitWalletInfo(storeBatch)
173 // DetachBlock detach a block and rollback state
174 func (w *Wallet) DetachBlock(block *types.Block) error {
178 blockHash := block.Hash()
179 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
184 storeBatch := w.DB.NewBatch()
185 w.detachUtxos(storeBatch, block, txStatus)
186 w.deleteTransactions(storeBatch, w.status.BestHeight)
188 w.status.BestHeight = block.Height - 1
189 w.status.BestHash = block.PreviousBlockHash
191 if w.status.WorkHeight > w.status.BestHeight {
192 w.status.WorkHeight = w.status.BestHeight
193 w.status.WorkHash = w.status.BestHash
196 return w.commitWalletInfo(storeBatch)
199 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
200 func (w *Wallet) walletUpdater() {
202 w.getRescanNotification()
203 for !w.chain.InMainChain(w.status.BestHash) {
204 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
206 log.WithField("err", err).Error("walletUpdater GetBlockByHash")
210 if err := w.DetachBlock(block); err != nil {
211 log.WithField("err", err).Error("walletUpdater detachBlock stop")
216 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
218 w.walletBlockWaiter()
222 if err := w.AttachBlock(block); err != nil {
223 log.WithField("err", err).Error("walletUpdater AttachBlock stop")
229 //RescanBlocks provide a trigger to rescan blocks
230 func (w *Wallet) RescanBlocks() {
232 case w.rescanCh <- struct{}{}:
238 // deleteAccountTxs deletes all txs in wallet
239 func (w *Wallet) deleteAccountTxs() {
240 storeBatch := w.DB.NewBatch()
242 txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
243 defer txIter.Release()
246 storeBatch.Delete(txIter.Key())
249 txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
250 defer txIndexIter.Release()
252 for txIndexIter.Next() {
253 storeBatch.Delete(txIndexIter.Key())
259 // DeleteAccount deletes account matching accountID, then rescan wallet
260 func (w *Wallet) DeleteAccount(accountID string) (err error) {
264 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
273 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
277 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
286 func (w *Wallet) getRescanNotification() {
295 func (w *Wallet) setRescanStatus() {
296 block, _ := w.chain.GetBlockByHeight(0)
297 w.status.WorkHash = bc.Hash{}
301 func (w *Wallet) walletBlockWaiter() {
303 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
309 // GetWalletStatusInfo return current wallet StatusInfo
310 func (w *Wallet) GetWalletStatusInfo() StatusInfo {