7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/account"
10 "github.com/vapor/asset"
11 "github.com/vapor/blockchain/pseudohsm"
12 "github.com/vapor/errors"
13 "github.com/vapor/event"
14 "github.com/vapor/protocol"
15 "github.com/vapor/protocol/bc"
16 "github.com/vapor/protocol/bc/types"
26 currentVersion = uint(1)
28 errBestBlockNotFoundInCore = errors.New("best block not found in core")
29 errWalletVersionMismatch = errors.New("wallet version mismatch")
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
41 //Wallet is related to storing account unspent outputs
47 AccountMgr *account.Manager
48 AssetReg *asset.Registry
51 RecoveryMgr *recoveryManager
52 eventDispatcher *event.Dispatcher
53 txMsgSub *event.Subscription
55 rescanCh chan struct{}
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStorer, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
66 RecoveryMgr: newRecoveryManager(store, account),
67 eventDispatcher: dispatcher,
68 rescanCh: make(chan struct{}, 1),
69 TxIndexFlag: txIndexFlag,
72 if err := w.loadWalletInfo(); err != nil {
76 if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
81 w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
87 go w.delUnconfirmedTx()
88 go w.memPoolTxQueryLoop()
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
96 case obj, ok := <-w.txMsgSub.Chan():
98 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
102 ev, ok := obj.Data.(protocol.TxMsgEvent)
104 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
108 switch ev.TxMsg.MsgType {
109 case protocol.MsgNewTx:
110 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111 case protocol.MsgRemoveTx:
112 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
114 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
120 func (w *Wallet) checkWalletInfo() error {
121 if w.status.Version != currentVersion {
122 return errWalletVersionMismatch
123 } else if !w.chain.BlockExist(&w.status.BestHash) {
124 return errBestBlockNotFoundInCore
130 //loadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) loadWalletInfo() error {
133 if rawWallet := w.store.GetWalletInfo(); rawWallet != nil {
134 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
138 err := w.checkWalletInfo()
143 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144 w.store.DeleteWalletTransactions()
145 w.store.DeleteWalletUTXOs()
148 w.status.Version = currentVersion
149 w.status.WorkHash = bc.Hash{}
150 block, err := w.chain.GetBlockByHeight(0)
154 return w.AttachBlock(block)
157 func (w *Wallet) commitWalletInfo() error {
158 rawWallet, err := json.Marshal(w.status)
160 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
164 w.store.SetWalletInfo(rawWallet)
168 // AttachBlock attach a new block
169 func (w *Wallet) AttachBlock(block *types.Block) error {
173 if block.PreviousBlockHash != w.status.WorkHash {
174 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
178 blockHash := block.Hash()
179 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
184 if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185 log.WithField("err", err).Error("filter recovery txs")
186 w.RecoveryMgr.finished()
189 annotatedTxs := w.filterAccountTxs(block, txStatus)
190 saveExternalAssetDefinition(block, w.store)
191 annotateTxsAccount(annotatedTxs, w.store)
194 defer w.store.CommitBatch()
196 if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
200 w.attachUtxos(block, txStatus)
201 w.status.WorkHeight = block.Height
202 w.status.WorkHash = block.Hash()
203 if w.status.WorkHeight >= w.status.BestHeight {
204 w.status.BestHeight = w.status.WorkHeight
205 w.status.BestHash = w.status.WorkHash
208 return w.commitWalletInfo()
211 // DetachBlock detach a block and rollback state
212 func (w *Wallet) DetachBlock(block *types.Block) error {
216 blockHash := block.Hash()
217 txStatus, err := w.chain.GetTransactionStatus(&blockHash)
223 defer w.store.CommitBatch()
225 w.detachUtxos(block, txStatus)
226 w.store.DeleteTransactions(w.status.BestHeight)
228 w.status.BestHeight = block.Height - 1
229 w.status.BestHash = block.PreviousBlockHash
231 if w.status.WorkHeight > w.status.BestHeight {
232 w.status.WorkHeight = w.status.BestHeight
233 w.status.WorkHash = w.status.BestHash
236 return w.commitWalletInfo()
239 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
240 func (w *Wallet) walletUpdater() {
242 w.getRescanNotification()
243 for !w.chain.InMainChain(w.status.BestHash) {
244 block, err := w.chain.GetBlockByHash(&w.status.BestHash)
246 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
250 if err := w.DetachBlock(block); err != nil {
251 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
256 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
258 w.walletBlockWaiter()
262 if err := w.AttachBlock(block); err != nil {
263 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
269 //RescanBlocks provide a trigger to rescan blocks
270 func (w *Wallet) RescanBlocks() {
272 case w.rescanCh <- struct{}{}:
278 // DeleteAccount deletes account matching accountID, then rescan wallet
279 func (w *Wallet) DeleteAccount(accountID string) (err error) {
283 if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
287 w.store.DeleteWalletTransactions()
292 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
296 if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
300 w.store.DeleteWalletTransactions()
305 func (w *Wallet) getRescanNotification() {
314 func (w *Wallet) setRescanStatus() {
315 block, _ := w.chain.GetBlockByHeight(0)
316 w.status.WorkHash = bc.Hash{}
320 func (w *Wallet) walletBlockWaiter() {
322 case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
328 // GetWalletStatusInfo return current wallet StatusInfo
329 func (w *Wallet) GetWalletStatusInfo() StatusInfo {