OSDN Git Service

Del signature from cache when added to the block (#258)
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/account"
9         "github.com/vapor/asset"
10         "github.com/vapor/blockchain/pseudohsm"
11         "github.com/vapor/errors"
12         "github.com/vapor/event"
13         "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         //SINGLE single sign
20         SINGLE    = 1
21         logModule = "wallet"
22 )
23
24 var (
25         currentVersion = uint(1)
26
27         errBestBlockNotFoundInCore = errors.New("best block not found in core")
28         errWalletVersionMismatch   = errors.New("wallet version mismatch")
29         ErrGetWalletStatusInfo     = errors.New("failed get wallet info")
30         ErrGetAsset                = errors.New("Failed to find asset definition")
31         ErrAccntTxIDNotFound       = errors.New("account TXID not found")
32 )
33
34 //StatusInfo is base valid block info to handle orphan block rollback
35 type StatusInfo struct {
36         Version    uint
37         WorkHeight uint64
38         WorkHash   bc.Hash
39         BestHeight uint64
40         BestHash   bc.Hash
41 }
42
43 //Wallet is related to storing account unspent outputs
44 type Wallet struct {
45         Store           WalletStore
46         rw              sync.RWMutex
47         Status          StatusInfo
48         TxIndexFlag     bool
49         AccountMgr      *account.Manager
50         AssetReg        *asset.Registry
51         Hsm             *pseudohsm.HSM
52         Chain           *protocol.Chain
53         RecoveryMgr     *recoveryManager
54         EventDispatcher *event.Dispatcher
55         TxMsgSub        *event.Subscription
56
57         rescanCh chan struct{}
58 }
59
60 //NewWallet return a new wallet instance
61 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
62         w := &Wallet{
63                 Store:           store,
64                 AccountMgr:      account,
65                 AssetReg:        asset,
66                 Chain:           chain,
67                 Hsm:             hsm,
68                 RecoveryMgr:     NewRecoveryManager(store, account),
69                 EventDispatcher: dispatcher,
70                 rescanCh:        make(chan struct{}, 1),
71                 TxIndexFlag:     txIndexFlag,
72         }
73
74         if err := w.LoadWalletInfo(); err != nil {
75                 return nil, err
76         }
77
78         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
79                 return nil, err
80         }
81
82         var err error
83         w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
84         if err != nil {
85                 return nil, err
86         }
87
88         go w.walletUpdater()
89         go w.delUnconfirmedTx()
90         go w.MemPoolTxQueryLoop()
91         return w, nil
92 }
93
94 // MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
95 func (w *Wallet) MemPoolTxQueryLoop() {
96         for {
97                 select {
98                 case obj, ok := <-w.TxMsgSub.Chan():
99                         if !ok {
100                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
101                                 return
102                         }
103
104                         ev, ok := obj.Data.(protocol.TxMsgEvent)
105                         if !ok {
106                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
107                                 continue
108                         }
109
110                         switch ev.TxMsg.MsgType {
111                         case protocol.MsgNewTx:
112                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
113                         case protocol.MsgRemoveTx:
114                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
115                         default:
116                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
117                         }
118                 }
119         }
120 }
121
122 func (w *Wallet) checkWalletInfo() error {
123         if w.Status.Version != currentVersion {
124                 return errWalletVersionMismatch
125         } else if !w.Chain.BlockExist(&w.Status.BestHash) {
126                 return errBestBlockNotFoundInCore
127         }
128
129         return nil
130 }
131
132 //LoadWalletInfo return stored wallet info and nil,
133 //if error, return initial wallet info and err
134 func (w *Wallet) LoadWalletInfo() error {
135         walletStatus, err := w.Store.GetWalletInfo()
136         if walletStatus == nil && err != ErrGetWalletStatusInfo {
137                 return err
138         }
139
140         if walletStatus != nil {
141                 w.Status = *walletStatus
142                 err = w.checkWalletInfo()
143                 if err == nil {
144                         return nil
145                 }
146
147                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
148                 w.Store.DeleteWalletTransactions()
149                 w.Store.DeleteWalletUTXOs()
150         }
151
152         w.Status.Version = currentVersion
153         w.Status.WorkHash = bc.Hash{}
154         block, err := w.Chain.GetBlockByHeight(0)
155         if err != nil {
156                 return err
157         }
158
159         return w.AttachBlock(block)
160 }
161
162 func (w *Wallet) commitWalletInfo(store WalletStore) error {
163         if err := store.SetWalletInfo(&w.Status); err != nil {
164                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
165                 return err
166         }
167         return nil
168 }
169
170 // AttachBlock attach a new block
171 func (w *Wallet) AttachBlock(block *types.Block) error {
172         w.rw.Lock()
173         defer w.rw.Unlock()
174
175         if block.PreviousBlockHash != w.Status.WorkHash {
176                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
177                 return nil
178         }
179
180         blockHash := block.Hash()
181         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
182         if err != nil {
183                 return err
184         }
185
186         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
187                 log.WithField("err", err).Error("filter recovery txs")
188                 w.RecoveryMgr.finished()
189         }
190
191         annotatedTxs := w.filterAccountTxs(block, txStatus)
192         if err := saveExternalAssetDefinition(block, w.Store); err != nil {
193                 return err
194         }
195
196         w.annotateTxsAccount(annotatedTxs)
197
198         newStore := w.Store.InitBatch()
199         if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
200                 return err
201         }
202
203         w.attachUtxos(block, txStatus, newStore)
204         w.Status.WorkHeight = block.Height
205         w.Status.WorkHash = block.Hash()
206         if w.Status.WorkHeight >= w.Status.BestHeight {
207                 w.Status.BestHeight = w.Status.WorkHeight
208                 w.Status.BestHash = w.Status.WorkHash
209         }
210
211         if err := w.commitWalletInfo(newStore); err != nil {
212                 return err
213         }
214
215         if err := newStore.CommitBatch(); err != nil {
216                 return err
217         }
218
219         return nil
220 }
221
222 // DetachBlock detach a block and rollback state
223 func (w *Wallet) DetachBlock(block *types.Block) error {
224         w.rw.Lock()
225         defer w.rw.Unlock()
226
227         blockHash := block.Hash()
228         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
229         if err != nil {
230                 return err
231         }
232
233         newStore := w.Store.InitBatch()
234
235         w.detachUtxos(block, txStatus, newStore)
236         newStore.DeleteTransactions(w.Status.BestHeight)
237
238         w.Status.BestHeight = block.Height - 1
239         w.Status.BestHash = block.PreviousBlockHash
240
241         if w.Status.WorkHeight > w.Status.BestHeight {
242                 w.Status.WorkHeight = w.Status.BestHeight
243                 w.Status.WorkHash = w.Status.BestHash
244         }
245         if err := w.commitWalletInfo(newStore); err != nil {
246                 return err
247         }
248
249         if err := newStore.CommitBatch(); err != nil {
250                 return err
251         }
252
253         return nil
254 }
255
256 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
257 func (w *Wallet) walletUpdater() {
258         for {
259                 w.getRescanNotification()
260                 for !w.Chain.InMainChain(w.Status.BestHash) {
261                         block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
262                         if err != nil {
263                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
264                                 return
265                         }
266
267                         if err := w.DetachBlock(block); err != nil {
268                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
269                                 return
270                         }
271                 }
272
273                 block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
274                 if block == nil {
275                         w.walletBlockWaiter()
276                         continue
277                 }
278
279                 if err := w.AttachBlock(block); err != nil {
280                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
281                         return
282                 }
283         }
284 }
285
286 //RescanBlocks provide a trigger to rescan blocks
287 func (w *Wallet) RescanBlocks() {
288         select {
289         case w.rescanCh <- struct{}{}:
290         default:
291                 return
292         }
293 }
294
295 // DeleteAccount deletes account matching accountID, then rescan wallet
296 func (w *Wallet) DeleteAccount(accountID string) (err error) {
297         w.rw.Lock()
298         defer w.rw.Unlock()
299
300         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
301                 return err
302         }
303
304         w.Store.DeleteWalletTransactions()
305         w.RescanBlocks()
306         return nil
307 }
308
309 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
310         w.rw.Lock()
311         defer w.rw.Unlock()
312
313         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
314                 return err
315         }
316
317         w.Store.DeleteWalletTransactions()
318         w.RescanBlocks()
319         return nil
320 }
321
322 func (w *Wallet) getRescanNotification() {
323         select {
324         case <-w.rescanCh:
325                 w.setRescanStatus()
326         default:
327                 return
328         }
329 }
330
331 func (w *Wallet) setRescanStatus() {
332         block, _ := w.Chain.GetBlockByHeight(0)
333         w.Status.WorkHash = bc.Hash{}
334         w.AttachBlock(block)
335 }
336
337 func (w *Wallet) walletBlockWaiter() {
338         select {
339         case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
340         case <-w.rescanCh:
341                 w.setRescanStatus()
342         }
343 }
344
345 // GetWalletStatusInfo return current wallet StatusInfo
346 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
347         w.rw.RLock()
348         defer w.rw.RUnlock()
349
350         return w.Status
351 }