7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/account"
10 "github.com/vapor/asset"
11 "github.com/vapor/blockchain/pseudohsm"
12 "github.com/vapor/errors"
13 "github.com/vapor/event"
14 "github.com/vapor/protocol"
15 "github.com/vapor/protocol/bc"
16 "github.com/vapor/protocol/bc/types"
26 currentVersion = uint(1)
28 errBestBlockNotFoundInCore = errors.New("best block not found in core")
29 errWalletVersionMismatch = errors.New("wallet version mismatch")
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
41 //Wallet is related to storing account unspent outputs
47 AccountMgr *account.Manager
48 AssetReg *asset.Registry
51 RecoveryMgr *recoveryManager
52 eventDispatcher *event.Dispatcher
53 txMsgSub *event.Subscription
55 rescanCh chan struct{}
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
66 RecoveryMgr: newRecoveryManager(store, account),
67 eventDispatcher: dispatcher,
68 rescanCh: make(chan struct{}, 1),
69 TxIndexFlag: txIndexFlag,
72 if err := w.loadWalletInfo(); err != nil {
76 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
81 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
87 go w.delUnconfirmedTx()
88 go w.memPoolTxQueryLoop()
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
96 case obj, ok := <-w.txMsgSub.Chan():
98 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
102 ev, ok := obj.Data.(protocol.TxMsgEvent)
104 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
108 switch ev.TxMsg.MsgType {
109 case protocol.MsgNewTx:
110 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111 case protocol.MsgRemoveTx:
112 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
114 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
120 func (w *Wallet) checkWalletInfo() error {
121 if w.status.Version != currentVersion {
122 return errWalletVersionMismatch
123 } else if !w.chain.BlockExist(&w.status.BestHash) {
124 return errBestBlockNotFoundInCore
130 //loadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) loadWalletInfo() error {
133 if rawWallet := w.store.GetWalletInfo(); rawWallet != nil {
134 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
138 err := w.checkWalletInfo()
143 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144 w.store.DeleteWalletTransactions()
145 w.store.DeleteWalletUTXOs()
148 w.status.Version = currentVersion
149 w.status.WorkHash = bc.Hash{}
150 block, err := w.chain.GetBlockByHeight(0)
154 return w.AttachBlock(block)
157 func (w *Wallet) commitWalletInfo() error {
158 rawWallet, err := json.Marshal(w.status)
160 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
164 w.store.SetWalletInfo(rawWallet)
168 // AttachBlock attach a new block
169 func (w *Wallet) AttachBlock(block *types.Block) error {
173 if block.PreviousBlockHash != w.status.WorkHash {
174 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
178 blockHash := block.Hash()
179 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
184 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185 log.WithField("err", err).Error("filter recovery txs")
186 w.RecoveryMgr.finished()
189 annotatedTxs := w.filterAccountTxs(block, txStatus)
190 if err := saveExternalAssetDefinition(block, w.store); err != nil {
193 w.annotateTxsAccount(annotatedTxs)
195 if err := w.store.InitBatch(); err != nil {
199 if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
203 w.attachUtxos(block, txStatus)
204 w.status.WorkHeight = block.Height
205 w.status.WorkHash = block.Hash()
206 if w.status.WorkHeight >= w.status.BestHeight {
207 w.status.BestHeight = w.status.WorkHeight
208 w.status.BestHash = w.status.WorkHash
211 if err := w.commitWalletInfo(); err != nil {
215 if err := w.store.CommitBatch(); err != nil {
222 // DetachBlock detach a block and rollback state
223 func (w *Wallet) DetachBlock(block *types.Block) error {
227 blockHash := block.Hash()
228 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
233 if err := w.store.InitBatch(); err != nil {
237 w.detachUtxos(block, txStatus)
238 w.store.DeleteTransactions(w.status.BestHeight)
240 w.status.BestHeight = block.Height - 1
241 w.status.BestHash = block.PreviousBlockHash
243 if w.status.WorkHeight > w.status.BestHeight {
244 w.status.WorkHeight = w.status.BestHeight
245 w.status.WorkHash = w.status.BestHash
247 if err := w.commitWalletInfo(); err != nil {
251 if err := w.store.CommitBatch(); err != nil {
258 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
259 func (w *Wallet) walletUpdater() {
261 w.getRescanNotification()
262 for !w.chain.InMainChain(w.status.BestHash) {
263 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
265 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
269 if err := w.DetachBlock(block); err != nil {
270 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
275 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
277 w.walletBlockWaiter()
281 if err := w.AttachBlock(block); err != nil {
282 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
288 //RescanBlocks provide a trigger to rescan blocks
289 func (w *Wallet) RescanBlocks() {
291 case w.rescanCh <- struct{}{}:
297 // DeleteAccount deletes account matching accountID, then rescan wallet
298 func (w *Wallet) DeleteAccount(accountID string) (err error) {
302 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
306 w.store.DeleteWalletTransactions()
311 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
315 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
319 w.store.DeleteWalletTransactions()
324 func (w *Wallet) getRescanNotification() {
333 func (w *Wallet) setRescanStatus() {
334 block, _ := w.chain.GetBlockByHeight(0)
335 w.status.WorkHash = bc.Hash{}
339 func (w *Wallet) walletBlockWaiter() {
341 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
347 // GetWalletStatusInfo return current wallet StatusInfo
348 func (w *Wallet) GetWalletStatusInfo() StatusInfo {