OSDN Git Service

Merge pull request #1721 from Bytom/politic-dashboard
[bytom/bytom.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "encoding/json"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/account"
10         "github.com/bytom/asset"
11         "github.com/bytom/blockchain/pseudohsm"
12         dbm "github.com/bytom/database/leveldb"
13         "github.com/bytom/errors"
14         "github.com/bytom/event"
15         "github.com/bytom/protocol"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/types"
18 )
19
20 const (
21         //SINGLE single sign
22         SINGLE    = 1
23         logModule = "wallet"
24 )
25
26 var (
27         currentVersion = uint(1)
28         walletKey      = []byte("walletInfo")
29
30         errBestBlockNotFoundInCore = errors.New("best block not found in core")
31         errWalletVersionMismatch   = errors.New("wallet version mismatch")
32 )
33
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
36         Version    uint
37         WorkHeight uint64
38         WorkHash   bc.Hash
39         BestHeight uint64
40         BestHash   bc.Hash
41 }
42
43 //Wallet is related to storing account unspent outputs
44 type Wallet struct {
45         DB              dbm.DB
46         rw              sync.RWMutex
47         status          StatusInfo
48         TxIndexFlag     bool
49         AccountMgr      *account.Manager
50         AssetReg        *asset.Registry
51         Hsm             *pseudohsm.HSM
52         chain           *protocol.Chain
53         RecoveryMgr     *recoveryManager
54         eventDispatcher *event.Dispatcher
55         txMsgSub        *event.Subscription
56
57         rescanCh chan struct{}
58 }
59
60 //NewWallet return a new wallet instance
61 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
62         w := &Wallet{
63                 DB:              walletDB,
64                 AccountMgr:      account,
65                 AssetReg:        asset,
66                 chain:           chain,
67                 Hsm:             hsm,
68                 RecoveryMgr:     newRecoveryManager(walletDB, account),
69                 eventDispatcher: dispatcher,
70                 rescanCh:        make(chan struct{}, 1),
71                 TxIndexFlag:     txIndexFlag,
72         }
73
74         if err := w.loadWalletInfo(); err != nil {
75                 return nil, err
76         }
77
78         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
79                 return nil, err
80         }
81
82         var err error
83         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
84         if err != nil {
85                 return nil, err
86         }
87
88         go w.walletUpdater()
89         go w.delUnconfirmedTx()
90         go w.memPoolTxQueryLoop()
91         return w, nil
92 }
93
94 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) memPoolTxQueryLoop() {
96         for {
97                 select {
98                 case obj, ok := <-w.txMsgSub.Chan():
99                         if !ok {
100                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
101                                 return
102                         }
103
104                         ev, ok := obj.Data.(protocol.TxMsgEvent)
105                         if !ok {
106                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
107                                 continue
108                         }
109
110                         switch ev.TxMsg.MsgType {
111                         case protocol.MsgNewTx:
112                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113                         case protocol.MsgRemoveTx:
114                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
115                         default:
116                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
117                         }
118                 }
119         }
120 }
121
122 func (w *Wallet) checkWalletInfo() error {
123         if w.status.Version != currentVersion {
124                 return errWalletVersionMismatch
125         } else if !w.chain.BlockExist(&w.status.BestHash) {
126                 return errBestBlockNotFoundInCore
127         }
128
129         return nil
130 }
131
132 //loadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) loadWalletInfo() error {
135         if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
136                 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
137                         return err
138                 }
139
140                 err := w.checkWalletInfo()
141                 if err == nil {
142                         return nil
143                 }
144
145                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
146                 w.deleteAccountTxs()
147                 w.deleteUtxos()
148         }
149
150         w.status.Version = currentVersion
151         block, err := w.chain.GetBlockByHeight(0)
152         if err != nil {
153                 return err
154         }
155         return w.AttachBlock(block)
156 }
157
158 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
159         rawWallet, err := json.Marshal(w.status)
160         if err != nil {
161                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
162                 return err
163         }
164
165         batch.Set(walletKey, rawWallet)
166         batch.Write()
167         return nil
168 }
169
170 // AttachBlock attach a new block
171 func (w *Wallet) AttachBlock(block *types.Block) error {
172         w.rw.Lock()
173         defer w.rw.Unlock()
174
175         if block.PreviousBlockHash != w.status.WorkHash {
176                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
177                 return nil
178         }
179
180         blockHash := block.Hash()
181         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
182         if err != nil {
183                 return err
184         }
185
186         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
187                 return err
188         }
189
190         storeBatch := w.DB.NewBatch()
191         if err := w.indexTransactions(storeBatch, block, txStatus); err != nil {
192                 return err
193         }
194
195         w.attachUtxos(storeBatch, block, txStatus)
196         w.status.WorkHeight = block.Height
197         w.status.WorkHash = block.Hash()
198         if w.status.WorkHeight >= w.status.BestHeight {
199                 w.status.BestHeight = w.status.WorkHeight
200                 w.status.BestHash = w.status.WorkHash
201         }
202         return w.commitWalletInfo(storeBatch)
203 }
204
205 // DetachBlock detach a block and rollback state
206 func (w *Wallet) DetachBlock(block *types.Block) error {
207         w.rw.Lock()
208         defer w.rw.Unlock()
209
210         blockHash := block.Hash()
211         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
212         if err != nil {
213                 return err
214         }
215
216         storeBatch := w.DB.NewBatch()
217         w.detachUtxos(storeBatch, block, txStatus)
218         w.deleteTransactions(storeBatch, w.status.BestHeight)
219
220         w.status.BestHeight = block.Height - 1
221         w.status.BestHash = block.PreviousBlockHash
222
223         if w.status.WorkHeight > w.status.BestHeight {
224                 w.status.WorkHeight = w.status.BestHeight
225                 w.status.WorkHash = w.status.BestHash
226         }
227
228         return w.commitWalletInfo(storeBatch)
229 }
230
231 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
232 func (w *Wallet) walletUpdater() {
233         for {
234                 w.getRescanNotification()
235                 for !w.chain.InMainChain(w.status.BestHash) {
236                         block, err := w.chain.GetBlockByHash(&w.status.BestHash)
237                         if err != nil {
238                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
239                                 return
240                         }
241
242                         if err := w.DetachBlock(block); err != nil {
243                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
244                                 return
245                         }
246                 }
247
248                 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
249                 if block == nil {
250                         w.walletBlockWaiter()
251                         continue
252                 }
253
254                 if err := w.AttachBlock(block); err != nil {
255                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
256                         return
257                 }
258         }
259 }
260
261 //RescanBlocks provide a trigger to rescan blocks
262 func (w *Wallet) RescanBlocks() {
263         select {
264         case w.rescanCh <- struct{}{}:
265         default:
266                 return
267         }
268 }
269
270 // deleteAccountTxs deletes all txs in wallet
271 func (w *Wallet) deleteAccountTxs() {
272         storeBatch := w.DB.NewBatch()
273
274         txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
275         defer txIter.Release()
276
277         for txIter.Next() {
278                 storeBatch.Delete(txIter.Key())
279         }
280
281         txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
282         defer txIndexIter.Release()
283
284         for txIndexIter.Next() {
285                 storeBatch.Delete(txIndexIter.Key())
286         }
287
288         storeBatch.Write()
289 }
290
291 func (w *Wallet) deleteUtxos() {
292         storeBatch := w.DB.NewBatch()
293         ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
294         defer ruIter.Release()
295         for ruIter.Next() {
296                 storeBatch.Delete(ruIter.Key())
297         }
298
299         suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
300         defer suIter.Release()
301         for suIter.Next() {
302                 storeBatch.Delete(suIter.Key())
303         }
304         storeBatch.Write()
305 }
306
307 // DeleteAccount deletes account matching accountID, then rescan wallet
308 func (w *Wallet) DeleteAccount(accountID string) (err error) {
309         w.rw.Lock()
310         defer w.rw.Unlock()
311
312         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
313                 return err
314         }
315
316         w.deleteAccountTxs()
317         w.RescanBlocks()
318         return nil
319 }
320
321 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
322         w.rw.Lock()
323         defer w.rw.Unlock()
324
325         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
326                 return err
327         }
328
329         w.deleteAccountTxs()
330         w.RescanBlocks()
331         return nil
332 }
333
334 func (w *Wallet) getRescanNotification() {
335         select {
336         case <-w.rescanCh:
337                 w.setRescanStatus()
338         default:
339                 return
340         }
341 }
342
343 func (w *Wallet) setRescanStatus() {
344         block, _ := w.chain.GetBlockByHeight(0)
345         w.status.WorkHash = bc.Hash{}
346         w.AttachBlock(block)
347 }
348
349 func (w *Wallet) walletBlockWaiter() {
350         select {
351         case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
352         case <-w.rescanCh:
353                 w.setRescanStatus()
354         }
355 }
356
357 // GetWalletStatusInfo return current wallet StatusInfo
358 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
359         w.rw.RLock()
360         defer w.rw.RUnlock()
361
362         return w.status
363 }