OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/vapor/account"
9         "github.com/bytom/vapor/asset"
10         "github.com/bytom/vapor/blockchain/pseudohsm"
11         "github.com/bytom/vapor/errors"
12         "github.com/bytom/vapor/event"
13         "github.com/bytom/vapor/protocol"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16 )
17
18 const (
19         //SINGLE single sign
20         SINGLE    = 1
21         logModule = "wallet"
22 )
23
24 var (
25         currentVersion = uint(1)
26
27         errBestBlockNotFoundInCore = errors.New("best block not found in core")
28         errWalletVersionMismatch   = errors.New("wallet version mismatch")
29         ErrGetWalletStatusInfo     = errors.New("failed get wallet info")
30         ErrGetAsset                = errors.New("Failed to find asset definition")
31         ErrAccntTxIDNotFound       = errors.New("account TXID not found")
32         ErrGetStandardUTXO         = errors.New("failed get standard UTXO")
33 )
34
35 //StatusInfo is base valid block info to handle orphan block rollback
36 type StatusInfo struct {
37         Version    uint
38         WorkHeight uint64
39         WorkHash   bc.Hash
40         BestHeight uint64
41         BestHash   bc.Hash
42 }
43
44 //Wallet is related to storing account unspent outputs
45 type Wallet struct {
46         Store           WalletStore
47         rw              sync.RWMutex
48         Status          StatusInfo
49         TxIndexFlag     bool
50         AccountMgr      *account.Manager
51         AssetReg        *asset.Registry
52         Hsm             *pseudohsm.HSM
53         Chain           *protocol.Chain
54         RecoveryMgr     *recoveryManager
55         EventDispatcher *event.Dispatcher
56         TxMsgSub        *event.Subscription
57
58         rescanCh chan struct{}
59 }
60
61 //NewWallet return a new wallet instance
62 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
63         w := &Wallet{
64                 Store:           store,
65                 AccountMgr:      account,
66                 AssetReg:        asset,
67                 Chain:           chain,
68                 Hsm:             hsm,
69                 RecoveryMgr:     NewRecoveryManager(store, account),
70                 EventDispatcher: dispatcher,
71                 rescanCh:        make(chan struct{}, 1),
72                 TxIndexFlag:     txIndexFlag,
73         }
74
75         if err := w.LoadWalletInfo(); err != nil {
76                 return nil, err
77         }
78
79         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
80                 return nil, err
81         }
82
83         return w, nil
84 }
85
86 // Run go to run some wallet recorvery and clean tx thread
87 func (w *Wallet) Run() error {
88         var err error
89         w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
90         if err != nil {
91                 return err
92         }
93
94         go w.walletUpdater()
95         go w.delUnconfirmedTx()
96         go w.MemPoolTxQueryLoop()
97
98         return nil
99 }
100
101 // MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
102 func (w *Wallet) MemPoolTxQueryLoop() {
103         for {
104                 select {
105                 case obj, ok := <-w.TxMsgSub.Chan():
106                         if !ok {
107                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
108                                 return
109                         }
110
111                         ev, ok := obj.Data.(protocol.TxMsgEvent)
112                         if !ok {
113                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
114                                 continue
115                         }
116
117                         switch ev.TxMsg.MsgType {
118                         case protocol.MsgNewTx:
119                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
120                         case protocol.MsgRemoveTx:
121                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
122                         default:
123                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
124                         }
125                 }
126         }
127 }
128
129 func (w *Wallet) checkWalletInfo() error {
130         if w.Status.Version != currentVersion {
131                 return errWalletVersionMismatch
132         } else if !w.Chain.BlockExist(&w.Status.BestHash) {
133                 return errBestBlockNotFoundInCore
134         }
135
136         return nil
137 }
138
139 //LoadWalletInfo return stored wallet info and nil,
140 //if error, return initial wallet info and err
141 func (w *Wallet) LoadWalletInfo() error {
142         walletStatus, err := w.Store.GetWalletInfo()
143         if walletStatus == nil && err != ErrGetWalletStatusInfo {
144                 return err
145         }
146
147         if walletStatus != nil {
148                 w.Status = *walletStatus
149                 err = w.checkWalletInfo()
150                 if err == nil {
151                         return nil
152                 }
153
154                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
155                 w.Store.DeleteWalletTransactions()
156                 w.Store.DeleteWalletUTXOs()
157         }
158
159         w.Status.Version = currentVersion
160         w.Status.WorkHash = bc.Hash{}
161         block, err := w.Chain.GetBlockByHeight(0)
162         if err != nil {
163                 return err
164         }
165
166         return w.AttachBlock(block)
167 }
168
169 func (w *Wallet) commitWalletInfo(store WalletStore) error {
170         if err := store.SetWalletInfo(&w.Status); err != nil {
171                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
172                 return err
173         }
174         return nil
175 }
176
177 // AttachBlock attach a new block
178 func (w *Wallet) AttachBlock(block *types.Block) error {
179         w.rw.Lock()
180         defer w.rw.Unlock()
181
182         if block.PreviousBlockHash != w.Status.WorkHash {
183                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
184                 return nil
185         }
186
187         blockHash := block.Hash()
188         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
189         if err != nil {
190                 return err
191         }
192
193         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
194                 log.WithField("err", err).Error("filter recovery txs")
195                 w.RecoveryMgr.finished()
196         }
197
198         annotatedTxs := w.filterAccountTxs(block, txStatus)
199         if err := saveExternalAssetDefinition(block, w.Store); err != nil {
200                 return err
201         }
202
203         w.annotateTxsAccount(annotatedTxs)
204
205         newStore := w.Store.InitBatch()
206         if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
207                 return err
208         }
209
210         w.attachUtxos(block, txStatus, newStore)
211         w.Status.WorkHeight = block.Height
212         w.Status.WorkHash = block.Hash()
213         if w.Status.WorkHeight >= w.Status.BestHeight {
214                 w.Status.BestHeight = w.Status.WorkHeight
215                 w.Status.BestHash = w.Status.WorkHash
216         }
217
218         if err := w.commitWalletInfo(newStore); err != nil {
219                 return err
220         }
221
222         if err := newStore.CommitBatch(); err != nil {
223                 return err
224         }
225
226         return nil
227 }
228
229 // DetachBlock detach a block and rollback state
230 func (w *Wallet) DetachBlock(block *types.Block) error {
231         w.rw.Lock()
232         defer w.rw.Unlock()
233
234         blockHash := block.Hash()
235         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
236         if err != nil {
237                 return err
238         }
239
240         newStore := w.Store.InitBatch()
241
242         w.detachUtxos(block, txStatus, newStore)
243         newStore.DeleteTransactions(w.Status.BestHeight)
244
245         w.Status.BestHeight = block.Height - 1
246         w.Status.BestHash = block.PreviousBlockHash
247
248         if w.Status.WorkHeight > w.Status.BestHeight {
249                 w.Status.WorkHeight = w.Status.BestHeight
250                 w.Status.WorkHash = w.Status.BestHash
251         }
252         if err := w.commitWalletInfo(newStore); err != nil {
253                 return err
254         }
255
256         if err := newStore.CommitBatch(); err != nil {
257                 return err
258         }
259
260         return nil
261 }
262
263 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
264 func (w *Wallet) walletUpdater() {
265         for {
266                 w.getRescanNotification()
267                 for !w.Chain.InMainChain(w.Status.BestHash) {
268                         block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
269                         if err != nil {
270                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
271                                 return
272                         }
273
274                         if err := w.DetachBlock(block); err != nil {
275                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
276                                 return
277                         }
278                 }
279
280                 block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
281                 if block == nil {
282                         w.walletBlockWaiter()
283                         continue
284                 }
285
286                 if err := w.AttachBlock(block); err != nil {
287                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
288                         return
289                 }
290         }
291 }
292
293 //RescanBlocks provide a trigger to rescan blocks
294 func (w *Wallet) RescanBlocks() {
295         select {
296         case w.rescanCh <- struct{}{}:
297         default:
298                 return
299         }
300 }
301
302 // DeleteAccount deletes account matching accountID, then rescan wallet
303 func (w *Wallet) DeleteAccount(accountID string) (err error) {
304         w.rw.Lock()
305         defer w.rw.Unlock()
306
307         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
308                 return err
309         }
310
311         w.Store.DeleteWalletTransactions()
312         w.RescanBlocks()
313         return nil
314 }
315
316 // Rollback wallet to target height
317 func (w *Wallet) Rollback(targetHeight uint64) error {
318         for w.Status.WorkHeight > targetHeight {
319                 block, err := w.Chain.GetBlockByHash(&w.Status.WorkHash)
320                 if err != nil {
321                         return err
322                 }
323
324                 if err = w.DetachBlock(block); err != nil {
325                         return err
326                 }
327         }
328
329         return nil
330 }
331
332 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
333         w.rw.Lock()
334         defer w.rw.Unlock()
335
336         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
337                 return err
338         }
339
340         w.Store.DeleteWalletTransactions()
341         w.RescanBlocks()
342         return nil
343 }
344
345 func (w *Wallet) getRescanNotification() {
346         select {
347         case <-w.rescanCh:
348                 w.setRescanStatus()
349         default:
350                 return
351         }
352 }
353
354 func (w *Wallet) setRescanStatus() {
355         block, _ := w.Chain.GetBlockByHeight(0)
356         w.Status.WorkHash = bc.Hash{}
357         w.AttachBlock(block)
358 }
359
360 func (w *Wallet) walletBlockWaiter() {
361         select {
362         case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
363         case <-w.rescanCh:
364                 w.setRescanStatus()
365         }
366 }
367
368 // GetWalletStatusInfo return current wallet StatusInfo
369 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
370         w.rw.RLock()
371         defer w.rw.RUnlock()
372
373         return w.Status
374 }