OSDN Git Service

rename w.status
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/account"
9         "github.com/vapor/asset"
10         "github.com/vapor/blockchain/pseudohsm"
11         "github.com/vapor/errors"
12         "github.com/vapor/event"
13         "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         //SINGLE single sign
20         SINGLE    = 1
21         logModule = "wallet"
22 )
23
24 var (
25         currentVersion = uint(1)
26
27         errBestBlockNotFoundInCore = errors.New("best block not found in core")
28         errWalletVersionMismatch   = errors.New("wallet version mismatch")
29         ErrGetWalletStatusInfo     = errors.New("failed get wallet info")
30 )
31
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
34         Version    uint
35         WorkHeight uint64
36         WorkHash   bc.Hash
37         BestHeight uint64
38         BestHash   bc.Hash
39 }
40
41 //Wallet is related to storing account unspent outputs
42 type Wallet struct {
43         store           WalletStore
44         rw              sync.RWMutex
45         Status          StatusInfo
46         TxIndexFlag     bool
47         AccountMgr      *account.Manager
48         AssetReg        *asset.Registry
49         Hsm             *pseudohsm.HSM
50         chain           *protocol.Chain
51         RecoveryMgr     *recoveryManager
52         eventDispatcher *event.Dispatcher
53         txMsgSub        *event.Subscription
54
55         rescanCh chan struct{}
56 }
57
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
60         w := &Wallet{
61                 store:           store,
62                 AccountMgr:      account,
63                 AssetReg:        asset,
64                 chain:           chain,
65                 Hsm:             hsm,
66                 RecoveryMgr:     newRecoveryManager(store, account),
67                 eventDispatcher: dispatcher,
68                 rescanCh:        make(chan struct{}, 1),
69                 TxIndexFlag:     txIndexFlag,
70         }
71
72         if err := w.LoadWalletInfo(); err != nil {
73                 return nil, err
74         }
75
76         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
77                 return nil, err
78         }
79
80         var err error
81         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
82         if err != nil {
83                 return nil, err
84         }
85
86         go w.walletUpdater()
87         go w.delUnconfirmedTx()
88         go w.memPoolTxQueryLoop()
89         return w, nil
90 }
91
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
94         for {
95                 select {
96                 case obj, ok := <-w.txMsgSub.Chan():
97                         if !ok {
98                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
99                                 return
100                         }
101
102                         ev, ok := obj.Data.(protocol.TxMsgEvent)
103                         if !ok {
104                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
105                                 continue
106                         }
107
108                         switch ev.TxMsg.MsgType {
109                         case protocol.MsgNewTx:
110                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111                         case protocol.MsgRemoveTx:
112                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
113                         default:
114                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
115                         }
116                 }
117         }
118 }
119
120 func (w *Wallet) checkWalletInfo() error {
121         if w.Status.Version != currentVersion {
122                 return errWalletVersionMismatch
123         } else if !w.chain.BlockExist(&w.Status.BestHash) {
124                 return errBestBlockNotFoundInCore
125         }
126
127         return nil
128 }
129
130 //LoadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) LoadWalletInfo() error {
133         walletStatus, err := w.store.GetWalletInfo()
134         if walletStatus == nil && err != ErrGetWalletStatusInfo {
135                 return err
136         }
137         if walletStatus != nil {
138                 w.Status = *walletStatus
139                 err = w.checkWalletInfo()
140                 if err == nil {
141                         return nil
142                 }
143                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144                 w.store.DeleteWalletTransactions()
145                 w.store.DeleteWalletUTXOs()
146         }
147
148         w.Status.Version = currentVersion
149         w.Status.WorkHash = bc.Hash{}
150         block, err := w.chain.GetBlockByHeight(0)
151         if err != nil {
152                 return err
153         }
154         return w.AttachBlock(block)
155 }
156
157 func (w *Wallet) commitWalletInfo() error {
158         if err := w.store.SetWalletInfo(&w.Status); err != nil {
159                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
160                 return err
161         }
162         return nil
163 }
164
165 // AttachBlock attach a new block
166 func (w *Wallet) AttachBlock(block *types.Block) error {
167         w.rw.Lock()
168         defer w.rw.Unlock()
169
170         if block.PreviousBlockHash != w.Status.WorkHash {
171                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
172                 return nil
173         }
174
175         blockHash := block.Hash()
176         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
177         if err != nil {
178                 return err
179         }
180
181         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
182                 log.WithField("err", err).Error("filter recovery txs")
183                 w.RecoveryMgr.finished()
184         }
185
186         annotatedTxs := w.filterAccountTxs(block, txStatus)
187         if err := saveExternalAssetDefinition(block, w.store); err != nil {
188                 return err
189         }
190         w.annotateTxsAccount(annotatedTxs)
191
192         if err := w.store.InitBatch(); err != nil {
193                 return err
194         }
195
196         if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
197                 return err
198         }
199
200         w.attachUtxos(block, txStatus)
201         w.Status.WorkHeight = block.Height
202         w.Status.WorkHash = block.Hash()
203         if w.Status.WorkHeight >= w.Status.BestHeight {
204                 w.Status.BestHeight = w.Status.WorkHeight
205                 w.Status.BestHash = w.Status.WorkHash
206         }
207
208         if err := w.commitWalletInfo(); err != nil {
209                 return err
210         }
211
212         if err := w.store.CommitBatch(); err != nil {
213                 return err
214         }
215
216         return nil
217 }
218
219 // DetachBlock detach a block and rollback state
220 func (w *Wallet) DetachBlock(block *types.Block) error {
221         w.rw.Lock()
222         defer w.rw.Unlock()
223
224         blockHash := block.Hash()
225         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
226         if err != nil {
227                 return err
228         }
229
230         if err := w.store.InitBatch(); err != nil {
231                 return err
232         }
233
234         w.detachUtxos(block, txStatus)
235         w.store.DeleteTransactions(w.Status.BestHeight)
236
237         w.Status.BestHeight = block.Height - 1
238         w.Status.BestHash = block.PreviousBlockHash
239
240         if w.Status.WorkHeight > w.Status.BestHeight {
241                 w.Status.WorkHeight = w.Status.BestHeight
242                 w.Status.WorkHash = w.Status.BestHash
243         }
244         if err := w.commitWalletInfo(); err != nil {
245                 return err
246         }
247
248         if err := w.store.CommitBatch(); err != nil {
249                 return err
250         }
251
252         return nil
253 }
254
255 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
256 func (w *Wallet) walletUpdater() {
257         for {
258                 w.getRescanNotification()
259                 for !w.chain.InMainChain(w.Status.BestHash) {
260                         block, err := w.chain.GetBlockByHash(&w.Status.BestHash)
261                         if err != nil {
262                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
263                                 return
264                         }
265
266                         if err := w.DetachBlock(block); err != nil {
267                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
268                                 return
269                         }
270                 }
271
272                 block, _ := w.chain.GetBlockByHeight(w.Status.WorkHeight + 1)
273                 if block == nil {
274                         w.walletBlockWaiter()
275                         continue
276                 }
277
278                 if err := w.AttachBlock(block); err != nil {
279                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
280                         return
281                 }
282         }
283 }
284
285 //RescanBlocks provide a trigger to rescan blocks
286 func (w *Wallet) RescanBlocks() {
287         select {
288         case w.rescanCh <- struct{}{}:
289         default:
290                 return
291         }
292 }
293
294 // DeleteAccount deletes account matching accountID, then rescan wallet
295 func (w *Wallet) DeleteAccount(accountID string) (err error) {
296         w.rw.Lock()
297         defer w.rw.Unlock()
298
299         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
300                 return err
301         }
302
303         w.store.DeleteWalletTransactions()
304         w.RescanBlocks()
305         return nil
306 }
307
308 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
309         w.rw.Lock()
310         defer w.rw.Unlock()
311
312         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
313                 return err
314         }
315
316         w.store.DeleteWalletTransactions()
317         w.RescanBlocks()
318         return nil
319 }
320
321 func (w *Wallet) getRescanNotification() {
322         select {
323         case <-w.rescanCh:
324                 w.setRescanStatus()
325         default:
326                 return
327         }
328 }
329
330 func (w *Wallet) setRescanStatus() {
331         block, _ := w.chain.GetBlockByHeight(0)
332         w.Status.WorkHash = bc.Hash{}
333         w.AttachBlock(block)
334 }
335
336 func (w *Wallet) walletBlockWaiter() {
337         select {
338         case <-w.chain.BlockWaiter(w.Status.WorkHeight + 1):
339         case <-w.rescanCh:
340                 w.setRescanStatus()
341         }
342 }
343
344 // GetWalletStatusInfo return current wallet StatusInfo
345 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
346         w.rw.RLock()
347         defer w.rw.RUnlock()
348
349         return w.Status
350 }