6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/account"
9 "github.com/vapor/asset"
10 "github.com/vapor/blockchain/pseudohsm"
11 "github.com/vapor/errors"
12 "github.com/vapor/event"
13 "github.com/vapor/protocol"
14 "github.com/vapor/protocol/bc"
15 "github.com/vapor/protocol/bc/types"
25 currentVersion = uint(1)
27 errBestBlockNotFoundInCore = errors.New("best block not found in core")
28 errWalletVersionMismatch = errors.New("wallet version mismatch")
29 ErrGetWalletStatusInfo = errors.New("failed get wallet info")
30 ErrGetAsset = errors.New("Failed to find asset definition")
31 ErrAccntTxIDNotFound = errors.New("account TXID not found")
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
43 //Wallet is related to storing account unspent outputs
49 AccountMgr *account.Manager
50 AssetReg *asset.Registry
53 RecoveryMgr *recoveryManager
54 EventDispatcher *event.Dispatcher
55 TxMsgSub *event.Subscription
57 rescanCh chan struct{}
60 //NewWallet return a new wallet instance
61 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
68 RecoveryMgr: NewRecoveryManager(store, account),
69 EventDispatcher: dispatcher,
70 rescanCh: make(chan struct{}, 1),
71 TxIndexFlag: txIndexFlag,
74 if err := w.LoadWalletInfo(); err != nil {
78 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
83 w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
89 go w.delUnconfirmedTx()
90 go w.MemPoolTxQueryLoop()
94 // MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) MemPoolTxQueryLoop() {
98 case obj, ok := <-w.TxMsgSub.Chan():
100 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
104 ev, ok := obj.Data.(protocol.TxMsgEvent)
106 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
110 switch ev.TxMsg.MsgType {
111 case protocol.MsgNewTx:
112 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113 case protocol.MsgRemoveTx:
114 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
116 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
122 func (w *Wallet) checkWalletInfo() error {
123 if w.Status.Version != currentVersion {
124 return errWalletVersionMismatch
125 } else if !w.Chain.BlockExist(&w.Status.BestHash) {
126 return errBestBlockNotFoundInCore
132 //LoadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) LoadWalletInfo() error {
135 walletStatus, err := w.Store.GetWalletInfo()
136 if walletStatus == nil && err != ErrGetWalletStatusInfo {
140 if walletStatus != nil {
141 w.Status = *walletStatus
142 err = w.checkWalletInfo()
147 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
148 w.Store.DeleteWalletTransactions()
149 w.Store.DeleteWalletUTXOs()
152 w.Status.Version = currentVersion
153 w.Status.WorkHash = bc.Hash{}
154 block, err := w.Chain.GetBlockByHeight(0)
159 return w.AttachBlock(block)
162 func (w *Wallet) commitWalletInfo(store WalletStore) error {
163 if err := store.SetWalletInfo(&w.Status); err != nil {
164 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
170 // AttachBlock attach a new block
171 func (w *Wallet) AttachBlock(block *types.Block) error {
175 if block.PreviousBlockHash != w.Status.WorkHash {
176 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
180 blockHash := block.Hash()
181 txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
186 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
187 log.WithField("err", err).Error("filter recovery txs")
188 w.RecoveryMgr.finished()
191 annotatedTxs := w.filterAccountTxs(block, txStatus)
192 if err := saveExternalAssetDefinition(block, w.Store); err != nil {
196 w.annotateTxsAccount(annotatedTxs)
198 newStore := w.Store.InitBatch()
199 if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
203 w.attachUtxos(block, txStatus, newStore)
204 w.Status.WorkHeight = block.Height
205 w.Status.WorkHash = block.Hash()
206 if w.Status.WorkHeight >= w.Status.BestHeight {
207 w.Status.BestHeight = w.Status.WorkHeight
208 w.Status.BestHash = w.Status.WorkHash
211 if err := w.commitWalletInfo(newStore); err != nil {
215 if err := newStore.CommitBatch(); err != nil {
222 // DetachBlock detach a block and rollback state
223 func (w *Wallet) DetachBlock(block *types.Block) error {
227 blockHash := block.Hash()
228 txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
233 newStore := w.Store.InitBatch()
235 w.detachUtxos(block, txStatus, newStore)
236 newStore.DeleteTransactions(w.Status.BestHeight)
238 w.Status.BestHeight = block.Height - 1
239 w.Status.BestHash = block.PreviousBlockHash
241 if w.Status.WorkHeight > w.Status.BestHeight {
242 w.Status.WorkHeight = w.Status.BestHeight
243 w.Status.WorkHash = w.Status.BestHash
245 if err := w.commitWalletInfo(newStore); err != nil {
249 if err := newStore.CommitBatch(); err != nil {
256 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
257 func (w *Wallet) walletUpdater() {
259 w.getRescanNotification()
260 for !w.Chain.InMainChain(w.Status.BestHash) {
261 block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
263 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
267 if err := w.DetachBlock(block); err != nil {
268 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
273 block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
275 w.walletBlockWaiter()
279 if err := w.AttachBlock(block); err != nil {
280 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
286 //RescanBlocks provide a trigger to rescan blocks
287 func (w *Wallet) RescanBlocks() {
289 case w.rescanCh <- struct{}{}:
295 // DeleteAccount deletes account matching accountID, then rescan wallet
296 func (w *Wallet) DeleteAccount(accountID string) (err error) {
300 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
304 w.Store.DeleteWalletTransactions()
309 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
313 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
317 w.Store.DeleteWalletTransactions()
322 func (w *Wallet) getRescanNotification() {
331 func (w *Wallet) setRescanStatus() {
332 block, _ := w.Chain.GetBlockByHeight(0)
333 w.Status.WorkHash = bc.Hash{}
337 func (w *Wallet) walletBlockWaiter() {
339 case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
345 // GetWalletStatusInfo return current wallet StatusInfo
346 func (w *Wallet) GetWalletStatusInfo() StatusInfo {