OSDN Git Service

update
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "encoding/json"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/account"
10         "github.com/vapor/asset"
11         "github.com/vapor/blockchain/pseudohsm"
12         "github.com/vapor/errors"
13         "github.com/vapor/event"
14         "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17 )
18
19 const (
20         //SINGLE single sign
21         SINGLE    = 1
22         logModule = "wallet"
23 )
24
25 var (
26         currentVersion = uint(1)
27
28         errBestBlockNotFoundInCore = errors.New("best block not found in core")
29         errWalletVersionMismatch   = errors.New("wallet version mismatch")
30 )
31
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
34         Version    uint
35         WorkHeight uint64
36         WorkHash   bc.Hash
37         BestHeight uint64
38         BestHash   bc.Hash
39 }
40
41 //Wallet is related to storing account unspent outputs
42 type Wallet struct {
43         store           WalletStore
44         rw              sync.RWMutex
45         status          StatusInfo
46         TxIndexFlag     bool
47         AccountMgr      *account.Manager
48         AssetReg        *asset.Registry
49         Hsm             *pseudohsm.HSM
50         chain           *protocol.Chain
51         RecoveryMgr     *recoveryManager
52         eventDispatcher *event.Dispatcher
53         txMsgSub        *event.Subscription
54
55         rescanCh chan struct{}
56 }
57
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStore, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
60         w := &Wallet{
61                 store:           store,
62                 AccountMgr:      account,
63                 AssetReg:        asset,
64                 chain:           chain,
65                 Hsm:             hsm,
66                 RecoveryMgr:     newRecoveryManager(store, account),
67                 eventDispatcher: dispatcher,
68                 rescanCh:        make(chan struct{}, 1),
69                 TxIndexFlag:     txIndexFlag,
70         }
71
72         if err := w.loadWalletInfo(); err != nil {
73                 return nil, err
74         }
75
76         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
77                 return nil, err
78         }
79
80         var err error
81         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
82         if err != nil {
83                 return nil, err
84         }
85
86         go w.walletUpdater()
87         go w.delUnconfirmedTx()
88         go w.memPoolTxQueryLoop()
89         return w, nil
90 }
91
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
94         for {
95                 select {
96                 case obj, ok := <-w.txMsgSub.Chan():
97                         if !ok {
98                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
99                                 return
100                         }
101
102                         ev, ok := obj.Data.(protocol.TxMsgEvent)
103                         if !ok {
104                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
105                                 continue
106                         }
107
108                         switch ev.TxMsg.MsgType {
109                         case protocol.MsgNewTx:
110                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111                         case protocol.MsgRemoveTx:
112                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
113                         default:
114                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
115                         }
116                 }
117         }
118 }
119
120 func (w *Wallet) checkWalletInfo() error {
121         if w.status.Version != currentVersion {
122                 return errWalletVersionMismatch
123         } else if !w.chain.BlockExist(&w.status.BestHash) {
124                 return errBestBlockNotFoundInCore
125         }
126
127         return nil
128 }
129
130 //loadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) loadWalletInfo() error {
133         if rawWallet := w.store.GetWalletInfo(); rawWallet != nil {
134                 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
135                         return err
136                 }
137
138                 err := w.checkWalletInfo()
139                 if err == nil {
140                         return nil
141                 }
142
143                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144                 w.store.DeleteWalletTransactions()
145                 w.store.DeleteWalletUTXOs()
146         }
147
148         w.status.Version = currentVersion
149         w.status.WorkHash = bc.Hash{}
150         block, err := w.chain.GetBlockByHeight(0)
151         if err != nil {
152                 return err
153         }
154         return w.AttachBlock(block)
155 }
156
157 func (w *Wallet) commitWalletInfo() error {
158         rawWallet, err := json.Marshal(w.status)
159         if err != nil {
160                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
161                 return err
162         }
163
164         w.store.SetWalletInfo(rawWallet)
165         return nil
166 }
167
168 // AttachBlock attach a new block
169 func (w *Wallet) AttachBlock(block *types.Block) error {
170         w.rw.Lock()
171         defer w.rw.Unlock()
172
173         if block.PreviousBlockHash != w.status.WorkHash {
174                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
175                 return nil
176         }
177
178         blockHash := block.Hash()
179         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
180         if err != nil {
181                 return err
182         }
183
184         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185                 log.WithField("err", err).Error("filter recovery txs")
186                 w.RecoveryMgr.finished()
187         }
188
189         annotatedTxs := w.filterAccountTxs(block, txStatus)
190         if err := saveExternalAssetDefinition(block, w.store); err != nil {
191                 return err
192         }
193         w.annotateTxsAccount(annotatedTxs)
194
195         if err := w.store.InitBatch(); err != nil {
196                 return err
197         }
198
199         if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
200                 return err
201         }
202
203         w.attachUtxos(block, txStatus)
204         w.status.WorkHeight = block.Height
205         w.status.WorkHash = block.Hash()
206         if w.status.WorkHeight >= w.status.BestHeight {
207                 w.status.BestHeight = w.status.WorkHeight
208                 w.status.BestHash = w.status.WorkHash
209         }
210
211         if err := w.commitWalletInfo(); err != nil {
212                 return err
213         }
214
215         if err := w.store.CommitBatch(); err != nil {
216                 return err
217         }
218
219         return nil
220 }
221
222 // DetachBlock detach a block and rollback state
223 func (w *Wallet) DetachBlock(block *types.Block) error {
224         w.rw.Lock()
225         defer w.rw.Unlock()
226
227         blockHash := block.Hash()
228         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
229         if err != nil {
230                 return err
231         }
232
233         if err := w.store.InitBatch(); err != nil {
234                 return err
235         }
236
237         w.detachUtxos(block, txStatus)
238         w.store.DeleteTransactions(w.status.BestHeight)
239
240         w.status.BestHeight = block.Height - 1
241         w.status.BestHash = block.PreviousBlockHash
242
243         if w.status.WorkHeight > w.status.BestHeight {
244                 w.status.WorkHeight = w.status.BestHeight
245                 w.status.WorkHash = w.status.BestHash
246         }
247         if err := w.commitWalletInfo(); err != nil {
248                 return err
249         }
250
251         if err := w.store.CommitBatch(); err != nil {
252                 return err
253         }
254
255         return nil
256 }
257
258 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
259 func (w *Wallet) walletUpdater() {
260         for {
261                 w.getRescanNotification()
262                 for !w.chain.InMainChain(w.status.BestHash) {
263                         block, err := w.chain.GetBlockByHash(&w.status.BestHash)
264                         if err != nil {
265                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
266                                 return
267                         }
268
269                         if err := w.DetachBlock(block); err != nil {
270                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
271                                 return
272                         }
273                 }
274
275                 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
276                 if block == nil {
277                         w.walletBlockWaiter()
278                         continue
279                 }
280
281                 if err := w.AttachBlock(block); err != nil {
282                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
283                         return
284                 }
285         }
286 }
287
288 //RescanBlocks provide a trigger to rescan blocks
289 func (w *Wallet) RescanBlocks() {
290         select {
291         case w.rescanCh <- struct{}{}:
292         default:
293                 return
294         }
295 }
296
297 // DeleteAccount deletes account matching accountID, then rescan wallet
298 func (w *Wallet) DeleteAccount(accountID string) (err error) {
299         w.rw.Lock()
300         defer w.rw.Unlock()
301
302         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
303                 return err
304         }
305
306         w.store.DeleteWalletTransactions()
307         w.RescanBlocks()
308         return nil
309 }
310
311 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
312         w.rw.Lock()
313         defer w.rw.Unlock()
314
315         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
316                 return err
317         }
318
319         w.store.DeleteWalletTransactions()
320         w.RescanBlocks()
321         return nil
322 }
323
324 func (w *Wallet) getRescanNotification() {
325         select {
326         case <-w.rescanCh:
327                 w.setRescanStatus()
328         default:
329                 return
330         }
331 }
332
333 func (w *Wallet) setRescanStatus() {
334         block, _ := w.chain.GetBlockByHeight(0)
335         w.status.WorkHash = bc.Hash{}
336         w.AttachBlock(block)
337 }
338
339 func (w *Wallet) walletBlockWaiter() {
340         select {
341         case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
342         case <-w.rescanCh:
343                 w.setRescanStatus()
344         }
345 }
346
347 // GetWalletStatusInfo return current wallet StatusInfo
348 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
349         w.rw.RLock()
350         defer w.rw.RUnlock()
351
352         return w.status
353 }