6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/vapor/account"
9 "github.com/bytom/vapor/asset"
10 "github.com/bytom/vapor/blockchain/pseudohsm"
11 "github.com/bytom/vapor/errors"
12 "github.com/bytom/vapor/event"
13 "github.com/bytom/vapor/protocol"
14 "github.com/bytom/vapor/protocol/bc"
15 "github.com/bytom/vapor/protocol/bc/types"
25 currentVersion = uint(1)
27 errBestBlockNotFoundInCore = errors.New("best block not found in core")
28 errWalletVersionMismatch = errors.New("wallet version mismatch")
29 ErrGetWalletStatusInfo = errors.New("failed get wallet info")
30 ErrGetAsset = errors.New("Failed to find asset definition")
31 ErrAccntTxIDNotFound = errors.New("account TXID not found")
32 ErrGetStandardUTXO = errors.New("failed get standard UTXO")
35 //StatusInfo is base valid block info to handle orphan block rollback
36 type StatusInfo struct {
44 //Wallet is related to storing account unspent outputs
50 AccountMgr *account.Manager
51 AssetReg *asset.Registry
54 RecoveryMgr *recoveryManager
55 EventDispatcher *event.Dispatcher
56 TxMsgSub *event.Subscription
58 rescanCh chan struct{}
61 //NewWallet return a new wallet instance
62 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
69 RecoveryMgr: NewRecoveryManager(store, account),
70 EventDispatcher: dispatcher,
71 rescanCh: make(chan struct{}, 1),
72 TxIndexFlag: txIndexFlag,
75 if err := w.LoadWalletInfo(); err != nil {
79 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
86 // Run go to run some wallet recorvery and clean tx thread
87 func (w *Wallet) Run() error {
89 w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
95 go w.delUnconfirmedTx()
96 go w.MemPoolTxQueryLoop()
101 // MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
102 func (w *Wallet) MemPoolTxQueryLoop() {
105 case obj, ok := <-w.TxMsgSub.Chan():
107 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
111 ev, ok := obj.Data.(protocol.TxMsgEvent)
113 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
117 switch ev.TxMsg.MsgType {
118 case protocol.MsgNewTx:
119 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
120 case protocol.MsgRemoveTx:
121 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
123 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
129 func (w *Wallet) checkWalletInfo() error {
130 if w.Status.Version != currentVersion {
131 return errWalletVersionMismatch
132 } else if !w.Chain.BlockExist(&w.Status.BestHash) {
133 return errBestBlockNotFoundInCore
139 //LoadWalletInfo return stored wallet info and nil,
140 //if error, return initial wallet info and err
141 func (w *Wallet) LoadWalletInfo() error {
142 walletStatus, err := w.Store.GetWalletInfo()
143 if walletStatus == nil && err != ErrGetWalletStatusInfo {
147 if walletStatus != nil {
148 w.Status = *walletStatus
149 err = w.checkWalletInfo()
154 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
155 w.Store.DeleteWalletTransactions()
156 w.Store.DeleteWalletUTXOs()
159 w.Status.Version = currentVersion
160 w.Status.WorkHash = bc.Hash{}
161 block, err := w.Chain.GetBlockByHeight(0)
166 return w.AttachBlock(block)
169 func (w *Wallet) commitWalletInfo(store WalletStore) error {
170 if err := store.SetWalletInfo(&w.Status); err != nil {
171 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
177 // AttachBlock attach a new block
178 func (w *Wallet) AttachBlock(block *types.Block) error {
182 if block.PreviousBlockHash != w.Status.WorkHash {
183 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
187 blockHash := block.Hash()
188 txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
193 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
194 log.WithField("err", err).Error("filter recovery txs")
195 w.RecoveryMgr.finished()
198 annotatedTxs := w.filterAccountTxs(block, txStatus)
199 if err := saveExternalAssetDefinition(block, w.Store); err != nil {
203 w.annotateTxsAccount(annotatedTxs)
205 newStore := w.Store.InitBatch()
206 if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
210 w.attachUtxos(block, txStatus, newStore)
211 w.Status.WorkHeight = block.Height
212 w.Status.WorkHash = block.Hash()
213 if w.Status.WorkHeight >= w.Status.BestHeight {
214 w.Status.BestHeight = w.Status.WorkHeight
215 w.Status.BestHash = w.Status.WorkHash
218 if err := w.commitWalletInfo(newStore); err != nil {
222 if err := newStore.CommitBatch(); err != nil {
229 // DetachBlock detach a block and rollback state
230 func (w *Wallet) DetachBlock(block *types.Block) error {
234 blockHash := block.Hash()
235 txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
240 newStore := w.Store.InitBatch()
242 w.detachUtxos(block, txStatus, newStore)
243 newStore.DeleteTransactions(w.Status.BestHeight)
245 w.Status.BestHeight = block.Height - 1
246 w.Status.BestHash = block.PreviousBlockHash
248 if w.Status.WorkHeight > w.Status.BestHeight {
249 w.Status.WorkHeight = w.Status.BestHeight
250 w.Status.WorkHash = w.Status.BestHash
252 if err := w.commitWalletInfo(newStore); err != nil {
256 if err := newStore.CommitBatch(); err != nil {
263 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
264 func (w *Wallet) walletUpdater() {
266 w.getRescanNotification()
267 for !w.Chain.InMainChain(w.Status.BestHash) {
268 block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
270 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
274 if err := w.DetachBlock(block); err != nil {
275 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
280 block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
282 w.walletBlockWaiter()
286 if err := w.AttachBlock(block); err != nil {
287 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
293 //RescanBlocks provide a trigger to rescan blocks
294 func (w *Wallet) RescanBlocks() {
296 case w.rescanCh <- struct{}{}:
302 // DeleteAccount deletes account matching accountID, then rescan wallet
303 func (w *Wallet) DeleteAccount(accountID string) (err error) {
307 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
311 w.Store.DeleteWalletTransactions()
316 // Rollback wallet to target height
317 func (w *Wallet) Rollback(targetHeight uint64) error {
318 for w.Status.WorkHeight > targetHeight {
319 block, err := w.Chain.GetBlockByHash(&w.Status.WorkHash)
324 if err = w.DetachBlock(block); err != nil {
332 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
336 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
340 w.Store.DeleteWalletTransactions()
345 func (w *Wallet) getRescanNotification() {
354 func (w *Wallet) setRescanStatus() {
355 block, _ := w.Chain.GetBlockByHeight(0)
356 w.Status.WorkHash = bc.Hash{}
360 func (w *Wallet) walletBlockWaiter() {
362 case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
368 // GetWalletStatusInfo return current wallet StatusInfo
369 func (w *Wallet) GetWalletStatusInfo() StatusInfo {