OSDN Git Service

92a24b96626ec3c44736ab67daab4209ede86170
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "encoding/json"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/account"
10         "github.com/vapor/asset"
11         "github.com/vapor/blockchain/pseudohsm"
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/errors"
14         "github.com/vapor/event"
15         "github.com/vapor/protocol"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         //SINGLE single sign
22         SINGLE    = 1
23         logModule = "wallet"
24 )
25
26 var (
27         currentVersion = uint(1)
28         walletKey      = []byte("walletInfo")
29
30         errBestBlockNotFoundInCore = errors.New("best block not found in core")
31         errWalletVersionMismatch   = errors.New("wallet version mismatch")
32 )
33
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
36         Version    uint
37         WorkHeight uint64
38         WorkHash   bc.Hash
39         BestHeight uint64
40         BestHash   bc.Hash
41 }
42
43 //Wallet is related to storing account unspent outputs
44 type Wallet struct {
45         DB              dbm.DB
46         rw              sync.RWMutex
47         status          StatusInfo
48         TxIndexFlag     bool
49         AccountMgr      *account.Manager
50         AssetReg        *asset.Registry
51         Hsm             *pseudohsm.HSM
52         chain           *protocol.Chain
53         RecoveryMgr     *recoveryManager
54         eventDispatcher *event.Dispatcher
55         txMsgSub        *event.Subscription
56
57         rescanCh chan struct{}
58 }
59
60 //NewWallet return a new wallet instance
61 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
62         w := &Wallet{
63                 DB:              walletDB,
64                 AccountMgr:      account,
65                 AssetReg:        asset,
66                 chain:           chain,
67                 Hsm:             hsm,
68                 RecoveryMgr:     newRecoveryManager(walletDB, account),
69                 eventDispatcher: dispatcher,
70                 rescanCh:        make(chan struct{}, 1),
71                 TxIndexFlag:     txIndexFlag,
72         }
73
74         if err := w.loadWalletInfo(); err != nil {
75                 return nil, err
76         }
77
78         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
79                 return nil, err
80         }
81
82         var err error
83         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
84         if err != nil {
85                 return nil, err
86         }
87
88         go w.walletUpdater()
89         go w.delUnconfirmedTx()
90         go w.memPoolTxQueryLoop()
91         return w, nil
92 }
93
94 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) memPoolTxQueryLoop() {
96         for {
97                 select {
98                 case obj, ok := <-w.txMsgSub.Chan():
99                         if !ok {
100                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
101                                 return
102                         }
103
104                         ev, ok := obj.Data.(protocol.TxMsgEvent)
105                         if !ok {
106                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
107                                 continue
108                         }
109
110                         switch ev.TxMsg.MsgType {
111                         case protocol.MsgNewTx:
112                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113                         case protocol.MsgRemoveTx:
114                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
115                         default:
116                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
117                         }
118                 }
119         }
120 }
121
122 func (w *Wallet) checkWalletInfo() error {
123         if w.status.Version != currentVersion {
124                 return errWalletVersionMismatch
125         } else if !w.chain.BlockExist(&w.status.BestHash) {
126                 return errBestBlockNotFoundInCore
127         }
128
129         return nil
130 }
131
132 //loadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) loadWalletInfo() error {
135         if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
136                 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
137                         return err
138                 }
139
140                 err := w.checkWalletInfo()
141                 if err == nil {
142                         return nil
143                 }
144
145                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
146                 w.deleteAccountTxs()
147                 w.deleteUtxos()
148         }
149
150         w.status.Version = currentVersion
151         w.status.WorkHash = bc.Hash{}
152         block, err := w.chain.GetBlockByHeight(0)
153         if err != nil {
154                 return err
155         }
156         return w.AttachBlock(block)
157 }
158
159 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
160         rawWallet, err := json.Marshal(w.status)
161         if err != nil {
162                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
163                 return err
164         }
165
166         batch.Set(walletKey, rawWallet)
167         batch.Write()
168         return nil
169 }
170
171 // AttachBlock attach a new block
172 func (w *Wallet) AttachBlock(block *types.Block) error {
173         w.rw.Lock()
174         defer w.rw.Unlock()
175
176         if block.PreviousBlockHash != w.status.WorkHash {
177                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
178                 return nil
179         }
180
181         blockHash := block.Hash()
182         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
183         if err != nil {
184                 return err
185         }
186
187         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
188                 log.WithField("err", err).Error("filter recovery txs")
189                 w.RecoveryMgr.finished()
190         }
191
192         storeBatch := w.DB.NewBatch()
193         if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
194                 return err
195         }
196
197         w.attachUtxos(storeBatch, block, txStatus)
198         w.status.WorkHeight = block.Height
199         w.status.WorkHash = block.Hash()
200         if w.status.WorkHeight >= w.status.BestHeight {
201                 w.status.BestHeight = w.status.WorkHeight
202                 w.status.BestHash = w.status.WorkHash
203         }
204         return w.commitWalletInfo(storeBatch)
205 }
206
207 // DetachBlock detach a block and rollback state
208 func (w *Wallet) DetachBlock(block *types.Block) error {
209         w.rw.Lock()
210         defer w.rw.Unlock()
211
212         blockHash := block.Hash()
213         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
214         if err != nil {
215                 return err
216         }
217
218         storeBatch := w.DB.NewBatch()
219         w.detachUtxos(storeBatch, block, txStatus)
220         w.deleteTransactions(storeBatch, w.status.BestHeight)
221
222         w.status.BestHeight = block.Height - 1
223         w.status.BestHash = block.PreviousBlockHash
224
225         if w.status.WorkHeight > w.status.BestHeight {
226                 w.status.WorkHeight = w.status.BestHeight
227                 w.status.WorkHash = w.status.BestHash
228         }
229
230         return w.commitWalletInfo(storeBatch)
231 }
232
233 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
234 func (w *Wallet) walletUpdater() {
235         for {
236                 w.getRescanNotification()
237                 for !w.chain.InMainChain(w.status.BestHash) {
238                         block, err := w.chain.GetBlockByHash(&w.status.BestHash)
239                         if err != nil {
240                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
241                                 return
242                         }
243
244                         if err := w.DetachBlock(block); err != nil {
245                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
246                                 return
247                         }
248                 }
249
250                 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
251                 if block == nil {
252                         w.walletBlockWaiter()
253                         continue
254                 }
255
256                 if err := w.AttachBlock(block); err != nil {
257                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
258                         return
259                 }
260         }
261 }
262
263 //RescanBlocks provide a trigger to rescan blocks
264 func (w *Wallet) RescanBlocks() {
265         select {
266         case w.rescanCh <- struct{}{}:
267         default:
268                 return
269         }
270 }
271
272 // deleteAccountTxs deletes all txs in wallet
273 func (w *Wallet) deleteAccountTxs() {
274         storeBatch := w.DB.NewBatch()
275
276         txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
277         defer txIter.Release()
278
279         for txIter.Next() {
280                 storeBatch.Delete(txIter.Key())
281         }
282
283         txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
284         defer txIndexIter.Release()
285
286         for txIndexIter.Next() {
287                 storeBatch.Delete(txIndexIter.Key())
288         }
289
290         storeBatch.Write()
291 }
292
293 func (w *Wallet) deleteUtxos() {
294         storeBatch := w.DB.NewBatch()
295         ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
296         defer ruIter.Release()
297         for ruIter.Next() {
298                 storeBatch.Delete(ruIter.Key())
299         }
300
301         suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
302         defer suIter.Release()
303         for suIter.Next() {
304                 storeBatch.Delete(suIter.Key())
305         }
306         storeBatch.Write()
307 }
308
309 // DeleteAccount deletes account matching accountID, then rescan wallet
310 func (w *Wallet) DeleteAccount(accountID string) (err error) {
311         w.rw.Lock()
312         defer w.rw.Unlock()
313
314         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
315                 return err
316         }
317
318         w.deleteAccountTxs()
319         w.RescanBlocks()
320         return nil
321 }
322
323 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
324         w.rw.Lock()
325         defer w.rw.Unlock()
326
327         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
328                 return err
329         }
330
331         w.deleteAccountTxs()
332         w.RescanBlocks()
333         return nil
334 }
335
336 func (w *Wallet) getRescanNotification() {
337         select {
338         case <-w.rescanCh:
339                 w.setRescanStatus()
340         default:
341                 return
342         }
343 }
344
345 func (w *Wallet) setRescanStatus() {
346         block, _ := w.chain.GetBlockByHeight(0)
347         w.status.WorkHash = bc.Hash{}
348         w.AttachBlock(block)
349 }
350
351 func (w *Wallet) walletBlockWaiter() {
352         select {
353         case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
354         case <-w.rescanCh:
355                 w.setRescanStatus()
356         }
357 }
358
359 // GetWalletStatusInfo return current wallet StatusInfo
360 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
361         w.rw.RLock()
362         defer w.rw.RUnlock()
363
364         return w.status
365 }