6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/account"
9 "github.com/vapor/asset"
10 "github.com/vapor/blockchain/pseudohsm"
11 "github.com/vapor/errors"
12 "github.com/vapor/event"
13 "github.com/vapor/protocol"
14 "github.com/vapor/protocol/bc"
15 "github.com/vapor/protocol/bc/types"
25 currentVersion = uint(1)
27 errBestBlockNotFoundInCore = errors.New("best block not found in core")
28 errWalletVersionMismatch = errors.New("wallet version mismatch")
29 ErrGetWalletStatusInfo = errors.New("failed get wallet info")
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
41 //Wallet is related to storing account unspent outputs
47 AccountMgr *account.Manager
48 AssetReg *asset.Registry
51 RecoveryMgr *recoveryManager
52 eventDispatcher *event.Dispatcher
53 txMsgSub *event.Subscription
55 rescanCh chan struct{}
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
66 RecoveryMgr: newRecoveryManager(store, account),
67 eventDispatcher: dispatcher,
68 rescanCh: make(chan struct{}, 1),
69 TxIndexFlag: txIndexFlag,
72 if err := w.LoadWalletInfo(); err != nil {
76 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
81 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
87 go w.delUnconfirmedTx()
88 go w.memPoolTxQueryLoop()
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
96 case obj, ok := <-w.txMsgSub.Chan():
98 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
102 ev, ok := obj.Data.(protocol.TxMsgEvent)
104 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
108 switch ev.TxMsg.MsgType {
109 case protocol.MsgNewTx:
110 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111 case protocol.MsgRemoveTx:
112 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
114 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
120 func (w *Wallet) checkWalletInfo() error {
121 if w.Status.Version != currentVersion {
122 return errWalletVersionMismatch
123 } else if !w.chain.BlockExist(&w.Status.BestHash) {
124 return errBestBlockNotFoundInCore
130 //LoadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) LoadWalletInfo() error {
133 walletStatus, err := w.store.GetWalletInfo()
134 if walletStatus == nil && err != ErrGetWalletStatusInfo {
137 if walletStatus != nil {
138 w.Status = *walletStatus
139 err = w.checkWalletInfo()
143 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144 w.store.DeleteWalletTransactions()
145 w.store.DeleteWalletUTXOs()
148 w.Status.Version = currentVersion
149 w.Status.WorkHash = bc.Hash{}
150 block, err := w.chain.GetBlockByHeight(0)
154 return w.AttachBlock(block)
157 func (w *Wallet) commitWalletInfo() error {
158 if err := w.store.SetWalletInfo(&w.Status); err != nil {
159 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
165 // AttachBlock attach a new block
166 func (w *Wallet) AttachBlock(block *types.Block) error {
170 if block.PreviousBlockHash != w.Status.WorkHash {
171 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
175 blockHash := block.Hash()
176 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
181 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
182 log.WithField("err", err).Error("filter recovery txs")
183 w.RecoveryMgr.finished()
186 annotatedTxs := w.filterAccountTxs(block, txStatus)
187 if err := saveExternalAssetDefinition(block, w.store); err != nil {
190 w.annotateTxsAccount(annotatedTxs)
192 if err := w.store.InitBatch(); err != nil {
196 if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
200 w.attachUtxos(block, txStatus)
201 w.Status.WorkHeight = block.Height
202 w.Status.WorkHash = block.Hash()
203 if w.Status.WorkHeight >= w.Status.BestHeight {
204 w.Status.BestHeight = w.Status.WorkHeight
205 w.Status.BestHash = w.Status.WorkHash
208 if err := w.commitWalletInfo(); err != nil {
212 if err := w.store.CommitBatch(); err != nil {
219 // DetachBlock detach a block and rollback state
220 func (w *Wallet) DetachBlock(block *types.Block) error {
224 blockHash := block.Hash()
225 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
230 if err := w.store.InitBatch(); err != nil {
234 w.detachUtxos(block, txStatus)
235 w.store.DeleteTransactions(w.Status.BestHeight)
237 w.Status.BestHeight = block.Height - 1
238 w.Status.BestHash = block.PreviousBlockHash
240 if w.Status.WorkHeight > w.Status.BestHeight {
241 w.Status.WorkHeight = w.Status.BestHeight
242 w.Status.WorkHash = w.Status.BestHash
244 if err := w.commitWalletInfo(); err != nil {
248 if err := w.store.CommitBatch(); err != nil {
255 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
256 func (w *Wallet) walletUpdater() {
258 w.getRescanNotification()
259 for !w.chain.InMainChain(w.Status.BestHash) {
260 block, err := w.chain.GetBlockByHash(&w.Status.BestHash)
262 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
266 if err := w.DetachBlock(block); err != nil {
267 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
272 block, _ := w.chain.GetBlockByHeight(w.Status.WorkHeight + 1)
274 w.walletBlockWaiter()
278 if err := w.AttachBlock(block); err != nil {
279 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
285 //RescanBlocks provide a trigger to rescan blocks
286 func (w *Wallet) RescanBlocks() {
288 case w.rescanCh <- struct{}{}:
294 // DeleteAccount deletes account matching accountID, then rescan wallet
295 func (w *Wallet) DeleteAccount(accountID string) (err error) {
299 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
303 w.store.DeleteWalletTransactions()
308 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
312 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
316 w.store.DeleteWalletTransactions()
321 func (w *Wallet) getRescanNotification() {
330 func (w *Wallet) setRescanStatus() {
331 block, _ := w.chain.GetBlockByHeight(0)
332 w.Status.WorkHash = bc.Hash{}
336 func (w *Wallet) walletBlockWaiter() {
338 case <-w.chain.BlockWaiter(w.Status.WorkHeight + 1):
344 // GetWalletStatusInfo return current wallet StatusInfo
345 func (w *Wallet) GetWalletStatusInfo() StatusInfo {