OSDN Git Service

writer close
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/account"
9         "github.com/vapor/asset"
10         "github.com/vapor/blockchain/pseudohsm"
11         "github.com/vapor/errors"
12         "github.com/vapor/event"
13         "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         //SINGLE single sign
20         SINGLE    = 1
21         logModule = "wallet"
22 )
23
24 var (
25         currentVersion = uint(1)
26
27         errBestBlockNotFoundInCore = errors.New("best block not found in core")
28         errWalletVersionMismatch   = errors.New("wallet version mismatch")
29         ErrGetWalletStatusInfo     = errors.New("failed get wallet info")
30         ErrGetAsset                = errors.New("Failed to find asset definition")
31         ErrAccntTxIDNotFound       = errors.New("account TXID not found")
32         ErrGetStandardUTXO         = errors.New("failed get standard UTXO")
33 )
34
35 //StatusInfo is base valid block info to handle orphan block rollback
36 type StatusInfo struct {
37         Version    uint
38         WorkHeight uint64
39         WorkHash   bc.Hash
40         BestHeight uint64
41         BestHash   bc.Hash
42 }
43
44 //Wallet is related to storing account unspent outputs
45 type Wallet struct {
46         Store           WalletStore
47         rw              sync.RWMutex
48         Status          StatusInfo
49         TxIndexFlag     bool
50         AccountMgr      *account.Manager
51         AssetReg        *asset.Registry
52         Hsm             *pseudohsm.HSM
53         Chain           *protocol.Chain
54         RecoveryMgr     *recoveryManager
55         EventDispatcher *event.Dispatcher
56         TxMsgSub        *event.Subscription
57
58         rescanCh chan struct{}
59 }
60
61 //NewWallet return a new wallet instance
62 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
63         w := &Wallet{
64                 Store:           store,
65                 AccountMgr:      account,
66                 AssetReg:        asset,
67                 Chain:           chain,
68                 Hsm:             hsm,
69                 RecoveryMgr:     NewRecoveryManager(store, account),
70                 EventDispatcher: dispatcher,
71                 rescanCh:        make(chan struct{}, 1),
72                 TxIndexFlag:     txIndexFlag,
73         }
74
75         if err := w.LoadWalletInfo(); err != nil {
76                 return nil, err
77         }
78
79         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
80                 return nil, err
81         }
82
83         var err error
84         w.TxMsgSub, err = w.EventDispatcher.Subscribe(protocol.TxMsgEvent{})
85         if err != nil {
86                 return nil, err
87         }
88
89         go w.walletUpdater()
90         go w.delUnconfirmedTx()
91         go w.MemPoolTxQueryLoop()
92         return w, nil
93 }
94
95 // MemPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
96 func (w *Wallet) MemPoolTxQueryLoop() {
97         for {
98                 select {
99                 case obj, ok := <-w.TxMsgSub.Chan():
100                         if !ok {
101                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
102                                 return
103                         }
104
105                         ev, ok := obj.Data.(protocol.TxMsgEvent)
106                         if !ok {
107                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
108                                 continue
109                         }
110
111                         switch ev.TxMsg.MsgType {
112                         case protocol.MsgNewTx:
113                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
114                         case protocol.MsgRemoveTx:
115                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
116                         default:
117                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
118                         }
119                 }
120         }
121 }
122
123 func (w *Wallet) checkWalletInfo() error {
124         if w.Status.Version != currentVersion {
125                 return errWalletVersionMismatch
126         } else if !w.Chain.BlockExist(&w.Status.BestHash) {
127                 return errBestBlockNotFoundInCore
128         }
129
130         return nil
131 }
132
133 //LoadWalletInfo return stored wallet info and nil,
134 //if error, return initial wallet info and err
135 func (w *Wallet) LoadWalletInfo() error {
136         walletStatus, err := w.Store.GetWalletInfo()
137         if walletStatus == nil && err != ErrGetWalletStatusInfo {
138                 return err
139         }
140
141         if walletStatus != nil {
142                 w.Status = *walletStatus
143                 err = w.checkWalletInfo()
144                 if err == nil {
145                         return nil
146                 }
147
148                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
149                 w.Store.DeleteWalletTransactions()
150                 w.Store.DeleteWalletUTXOs()
151         }
152
153         w.Status.Version = currentVersion
154         w.Status.WorkHash = bc.Hash{}
155         block, err := w.Chain.GetBlockByHeight(0)
156         if err != nil {
157                 return err
158         }
159
160         return w.AttachBlock(block)
161 }
162
163 func (w *Wallet) commitWalletInfo(store WalletStore) error {
164         if err := store.SetWalletInfo(&w.Status); err != nil {
165                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
166                 return err
167         }
168         return nil
169 }
170
171 // AttachBlock attach a new block
172 func (w *Wallet) AttachBlock(block *types.Block) error {
173         w.rw.Lock()
174         defer w.rw.Unlock()
175
176         if block.PreviousBlockHash != w.Status.WorkHash {
177                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
178                 return nil
179         }
180
181         blockHash := block.Hash()
182         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
183         if err != nil {
184                 return err
185         }
186
187         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
188                 log.WithField("err", err).Error("filter recovery txs")
189                 w.RecoveryMgr.finished()
190         }
191
192         annotatedTxs := w.filterAccountTxs(block, txStatus)
193         if err := saveExternalAssetDefinition(block, w.Store); err != nil {
194                 return err
195         }
196
197         w.annotateTxsAccount(annotatedTxs)
198
199         newStore := w.Store.InitBatch()
200         if err := w.indexTransactions(block, txStatus, annotatedTxs, newStore); err != nil {
201                 return err
202         }
203
204         w.attachUtxos(block, txStatus, newStore)
205         w.Status.WorkHeight = block.Height
206         w.Status.WorkHash = block.Hash()
207         if w.Status.WorkHeight >= w.Status.BestHeight {
208                 w.Status.BestHeight = w.Status.WorkHeight
209                 w.Status.BestHash = w.Status.WorkHash
210         }
211
212         if err := w.commitWalletInfo(newStore); err != nil {
213                 return err
214         }
215
216         if err := newStore.CommitBatch(); err != nil {
217                 return err
218         }
219
220         return nil
221 }
222
223 // DetachBlock detach a block and rollback state
224 func (w *Wallet) DetachBlock(block *types.Block) error {
225         w.rw.Lock()
226         defer w.rw.Unlock()
227
228         blockHash := block.Hash()
229         txStatus, err := w.Chain.GetTransactionStatus(&blockHash)
230         if err != nil {
231                 return err
232         }
233
234         newStore := w.Store.InitBatch()
235
236         w.detachUtxos(block, txStatus, newStore)
237         newStore.DeleteTransactions(w.Status.BestHeight)
238
239         w.Status.BestHeight = block.Height - 1
240         w.Status.BestHash = block.PreviousBlockHash
241
242         if w.Status.WorkHeight > w.Status.BestHeight {
243                 w.Status.WorkHeight = w.Status.BestHeight
244                 w.Status.WorkHash = w.Status.BestHash
245         }
246         if err := w.commitWalletInfo(newStore); err != nil {
247                 return err
248         }
249
250         if err := newStore.CommitBatch(); err != nil {
251                 return err
252         }
253
254         return nil
255 }
256
257 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
258 func (w *Wallet) walletUpdater() {
259         for {
260                 w.getRescanNotification()
261                 for !w.Chain.InMainChain(w.Status.BestHash) {
262                         block, err := w.Chain.GetBlockByHash(&w.Status.BestHash)
263                         if err != nil {
264                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
265                                 return
266                         }
267
268                         if err := w.DetachBlock(block); err != nil {
269                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
270                                 return
271                         }
272                 }
273
274                 block, _ := w.Chain.GetBlockByHeight(w.Status.WorkHeight + 1)
275                 if block == nil {
276                         w.walletBlockWaiter()
277                         continue
278                 }
279
280                 if err := w.AttachBlock(block); err != nil {
281                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
282                         return
283                 }
284         }
285 }
286
287 //RescanBlocks provide a trigger to rescan blocks
288 func (w *Wallet) RescanBlocks() {
289         select {
290         case w.rescanCh <- struct{}{}:
291         default:
292                 return
293         }
294 }
295
296 // DeleteAccount deletes account matching accountID, then rescan wallet
297 func (w *Wallet) DeleteAccount(accountID string) (err error) {
298         w.rw.Lock()
299         defer w.rw.Unlock()
300
301         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
302                 return err
303         }
304
305         w.Store.DeleteWalletTransactions()
306         w.RescanBlocks()
307         return nil
308 }
309
310 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
311         w.rw.Lock()
312         defer w.rw.Unlock()
313
314         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
315                 return err
316         }
317
318         w.Store.DeleteWalletTransactions()
319         w.RescanBlocks()
320         return nil
321 }
322
323 func (w *Wallet) getRescanNotification() {
324         select {
325         case <-w.rescanCh:
326                 w.setRescanStatus()
327         default:
328                 return
329         }
330 }
331
332 func (w *Wallet) setRescanStatus() {
333         block, _ := w.Chain.GetBlockByHeight(0)
334         w.Status.WorkHash = bc.Hash{}
335         w.AttachBlock(block)
336 }
337
338 func (w *Wallet) walletBlockWaiter() {
339         select {
340         case <-w.Chain.BlockWaiter(w.Status.WorkHeight + 1):
341         case <-w.rescanCh:
342                 w.setRescanStatus()
343         }
344 }
345
346 // GetWalletStatusInfo return current wallet StatusInfo
347 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
348         w.rw.RLock()
349         defer w.rw.RUnlock()
350
351         return w.Status
352 }