OSDN Git Service

delete some black utxo (#2129)
[bytom/bytom.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "encoding/json"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/bytom/account"
10         "github.com/bytom/bytom/asset"
11         "github.com/bytom/bytom/blockchain/pseudohsm"
12         "github.com/bytom/bytom/contract"
13         dbm "github.com/bytom/bytom/database/leveldb"
14         "github.com/bytom/bytom/errors"
15         "github.com/bytom/bytom/event"
16         "github.com/bytom/bytom/protocol"
17         "github.com/bytom/bytom/protocol/bc"
18         "github.com/bytom/bytom/protocol/bc/types"
19 )
20
21 const (
22         //SINGLE single sign
23         SINGLE    = 1
24         logModule = "wallet"
25 )
26
27 var (
28         currentVersion = uint(1)
29         walletKey      = []byte("walletInfo")
30
31         errBestBlockNotFoundInCore = errors.New("best block not found in core")
32         errWalletVersionMismatch   = errors.New("wallet version mismatch")
33 )
34
35 //StatusInfo is base valid block info to handle orphan block rollback
36 type StatusInfo struct {
37         Version    uint
38         WorkHeight uint64
39         WorkHash   bc.Hash
40         BestHeight uint64
41         BestHash   bc.Hash
42 }
43
44 //Wallet is related to storing account unspent outputs
45 type Wallet struct {
46         DB              dbm.DB
47         rw              sync.RWMutex
48         status          StatusInfo
49         TxIndexFlag     bool
50         AccountMgr      *account.Manager
51         AssetReg        *asset.Registry
52         ContractReg     *contract.Registry
53         Hsm             *pseudohsm.HSM
54         chain           *protocol.Chain
55         RecoveryMgr     *recoveryManager
56         eventDispatcher *event.Dispatcher
57         txMsgSub        *event.Subscription
58
59         rescanCh chan struct{}
60 }
61
62 //NewWallet return a new wallet instance
63 func NewWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, contract *contract.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
64         w := &Wallet{
65                 DB:              walletDB,
66                 AccountMgr:      account,
67                 AssetReg:        asset,
68                 ContractReg:     contract,
69                 chain:           chain,
70                 Hsm:             hsm,
71                 RecoveryMgr:     newRecoveryManager(walletDB, account),
72                 eventDispatcher: dispatcher,
73                 rescanCh:        make(chan struct{}, 1),
74                 TxIndexFlag:     txIndexFlag,
75         }
76
77         if err := w.loadWalletInfo(); err != nil {
78                 return nil, err
79         }
80
81         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
82                 return nil, err
83         }
84
85         var err error
86         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
87         if err != nil {
88                 return nil, err
89         }
90
91         go w.walletUpdater()
92         go w.delUnconfirmedTx()
93         go w.memPoolTxQueryLoop()
94         return w, nil
95 }
96
97 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
98 func (w *Wallet) memPoolTxQueryLoop() {
99         for {
100                 select {
101                 case obj, ok := <-w.txMsgSub.Chan():
102                         if !ok {
103                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
104                                 return
105                         }
106
107                         ev, ok := obj.Data.(protocol.TxMsgEvent)
108                         if !ok {
109                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
110                                 continue
111                         }
112
113                         switch ev.TxMsg.MsgType {
114                         case protocol.MsgNewTx:
115                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
116                         case protocol.MsgRemoveTx:
117                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
118                         default:
119                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
120                         }
121                 }
122         }
123 }
124
125 func (w *Wallet) checkWalletInfo() error {
126         if w.status.Version != currentVersion {
127                 return errWalletVersionMismatch
128         } else if !w.chain.BlockExist(&w.status.BestHash) {
129                 return errBestBlockNotFoundInCore
130         }
131
132         return nil
133 }
134
135 //loadWalletInfo return stored wallet info and nil,
136 //if error, return initial wallet info and err
137 func (w *Wallet) loadWalletInfo() error {
138         if rawWallet := w.DB.Get(walletKey); rawWallet != nil {
139                 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
140                         return err
141                 }
142
143                 err := w.checkWalletInfo()
144                 if err == nil {
145                         return nil
146                 }
147
148                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
149                 w.deleteAccountTxs()
150                 w.deleteUtxos()
151         }
152
153         w.status.Version = currentVersion
154         w.status.WorkHash = bc.Hash{}
155         block, err := w.chain.GetBlockByHeight(0)
156         if err != nil {
157                 return err
158         }
159         return w.AttachBlock(block)
160 }
161
162 func (w *Wallet) commitWalletInfo(batch dbm.Batch) error {
163         rawWallet, err := json.Marshal(w.status)
164         if err != nil {
165                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
166                 return err
167         }
168
169         batch.Set(walletKey, rawWallet)
170         batch.Write()
171         return nil
172 }
173
174 // AttachBlock attach a new block
175 func (w *Wallet) AttachBlock(block *types.Block) error {
176         w.rw.Lock()
177         defer w.rw.Unlock()
178
179         if block.PreviousBlockHash != w.status.WorkHash {
180                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
181                 return nil
182         }
183
184         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185                 log.WithField("err", err).Error("filter recovery txs")
186                 w.RecoveryMgr.finished()
187         }
188
189         storeBatch := w.DB.NewBatch()
190         if err := w.indexTransactions(storeBatch, block); err != nil {
191                 return err
192         }
193
194         w.attachUtxos(storeBatch, block)
195         w.status.WorkHeight = block.Height
196         w.status.WorkHash = block.Hash()
197         if w.status.WorkHeight >= w.status.BestHeight {
198                 w.status.BestHeight = w.status.WorkHeight
199                 w.status.BestHash = w.status.WorkHash
200         }
201         return w.commitWalletInfo(storeBatch)
202 }
203
204 // DetachBlock detach a block and rollback state
205 func (w *Wallet) DetachBlock(block *types.Block) error {
206         w.rw.Lock()
207         defer w.rw.Unlock()
208
209         storeBatch := w.DB.NewBatch()
210         w.detachUtxos(storeBatch, block)
211         w.deleteTransactions(storeBatch, w.status.BestHeight)
212
213         w.status.BestHeight = block.Height - 1
214         w.status.BestHash = block.PreviousBlockHash
215
216         if w.status.WorkHeight > w.status.BestHeight {
217                 w.status.WorkHeight = w.status.BestHeight
218                 w.status.WorkHash = w.status.BestHash
219         }
220
221         return w.commitWalletInfo(storeBatch)
222 }
223
224 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
225 func (w *Wallet) walletUpdater() {
226         for {
227                 w.getRescanNotification()
228                 for !w.chain.InMainChain(w.status.BestHash) {
229                         block, err := w.chain.GetBlockByHash(&w.status.BestHash)
230                         if err != nil {
231                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
232                                 return
233                         }
234
235                         if err := w.DetachBlock(block); err != nil {
236                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
237                                 return
238                         }
239                 }
240
241                 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
242                 if block == nil {
243                         w.walletBlockWaiter()
244                         continue
245                 }
246
247                 if err := w.AttachBlock(block); err != nil {
248                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
249                         return
250                 }
251         }
252 }
253
254 //RescanBlocks provide a trigger to rescan blocks
255 func (w *Wallet) RescanBlocks() {
256         select {
257         case w.rescanCh <- struct{}{}:
258         default:
259                 return
260         }
261 }
262
263 // deleteAccountTxs deletes all txs in wallet
264 func (w *Wallet) deleteAccountTxs() {
265         storeBatch := w.DB.NewBatch()
266
267         txIter := w.DB.IteratorPrefix([]byte(TxPrefix))
268         defer txIter.Release()
269
270         for txIter.Next() {
271                 storeBatch.Delete(txIter.Key())
272         }
273
274         txIndexIter := w.DB.IteratorPrefix([]byte(TxIndexPrefix))
275         defer txIndexIter.Release()
276
277         for txIndexIter.Next() {
278                 storeBatch.Delete(txIndexIter.Key())
279         }
280
281         storeBatch.Write()
282 }
283
284 func (w *Wallet) deleteUtxos() {
285         storeBatch := w.DB.NewBatch()
286         ruIter := w.DB.IteratorPrefix([]byte(account.UTXOPreFix))
287         defer ruIter.Release()
288         for ruIter.Next() {
289                 storeBatch.Delete(ruIter.Key())
290         }
291
292         suIter := w.DB.IteratorPrefix([]byte(account.SUTXOPrefix))
293         defer suIter.Release()
294         for suIter.Next() {
295                 storeBatch.Delete(suIter.Key())
296         }
297         storeBatch.Write()
298 }
299
300 // DeleteAccount deletes account matching accountID, then rescan wallet
301 func (w *Wallet) DeleteAccount(accountID string) (err error) {
302         w.rw.Lock()
303         defer w.rw.Unlock()
304
305         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
306                 return err
307         }
308
309         w.deleteAccountTxs()
310         w.RescanBlocks()
311         return nil
312 }
313
314 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
315         w.rw.Lock()
316         defer w.rw.Unlock()
317
318         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
319                 return err
320         }
321
322         w.deleteAccountTxs()
323         w.RescanBlocks()
324         return nil
325 }
326
327 func (w *Wallet) getRescanNotification() {
328         select {
329         case <-w.rescanCh:
330                 w.setRescanStatus()
331         default:
332                 return
333         }
334 }
335
336 func (w *Wallet) setRescanStatus() {
337         block, _ := w.chain.GetBlockByHeight(0)
338         w.status.WorkHash = bc.Hash{}
339         w.AttachBlock(block)
340 }
341
342 func (w *Wallet) walletBlockWaiter() {
343         select {
344         case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
345         case <-w.rescanCh:
346                 w.setRescanStatus()
347         }
348 }
349
350 // GetWalletStatusInfo return current wallet StatusInfo
351 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
352         w.rw.RLock()
353         defer w.rw.RUnlock()
354
355         return w.status
356 }