OSDN Git Service

update GetUTXOs
[bytom/vapor.git] / wallet / wallet.go
1 package wallet
2
3 import (
4         "encoding/json"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/account"
10         "github.com/vapor/asset"
11         "github.com/vapor/blockchain/pseudohsm"
12         "github.com/vapor/errors"
13         "github.com/vapor/event"
14         "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17 )
18
19 const (
20         //SINGLE single sign
21         SINGLE    = 1
22         logModule = "wallet"
23 )
24
25 var (
26         currentVersion = uint(1)
27
28         errBestBlockNotFoundInCore = errors.New("best block not found in core")
29         errWalletVersionMismatch   = errors.New("wallet version mismatch")
30 )
31
32 //StatusInfo is base valid block info to handle orphan block rollback
33 type StatusInfo struct {
34         Version    uint
35         WorkHeight uint64
36         WorkHash   bc.Hash
37         BestHeight uint64
38         BestHash   bc.Hash
39 }
40
41 //Wallet is related to storing account unspent outputs
42 type Wallet struct {
43         store           WalletStorer
44         rw              sync.RWMutex
45         status          StatusInfo
46         TxIndexFlag     bool
47         AccountMgr      *account.Manager
48         AssetReg        *asset.Registry
49         Hsm             *pseudohsm.HSM
50         chain           *protocol.Chain
51         RecoveryMgr     *recoveryManager
52         eventDispatcher *event.Dispatcher
53         txMsgSub        *event.Subscription
54
55         rescanCh chan struct{}
56 }
57
58 //NewWallet return a new wallet instance
59 func NewWallet(store WalletStorer, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher, txIndexFlag bool) (*Wallet, error) {
60         w := &Wallet{
61                 store:           store,
62                 AccountMgr:      account,
63                 AssetReg:        asset,
64                 chain:           chain,
65                 Hsm:             hsm,
66                 RecoveryMgr:     newRecoveryManager(store, account),
67                 eventDispatcher: dispatcher,
68                 rescanCh:        make(chan struct{}, 1),
69                 TxIndexFlag:     txIndexFlag,
70         }
71
72         if err := w.loadWalletInfo(); err != nil {
73                 return nil, err
74         }
75
76         if err := w.RecoveryMgr.LoadStatusInfo(); err != nil {
77                 return nil, err
78         }
79
80         var err error
81         w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
82         if err != nil {
83                 return nil, err
84         }
85
86         go w.walletUpdater()
87         go w.delUnconfirmedTx()
88         go w.memPoolTxQueryLoop()
89         return w, nil
90 }
91
92 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
93 func (w *Wallet) memPoolTxQueryLoop() {
94         for {
95                 select {
96                 case obj, ok := <-w.txMsgSub.Chan():
97                         if !ok {
98                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
99                                 return
100                         }
101
102                         ev, ok := obj.Data.(protocol.TxMsgEvent)
103                         if !ok {
104                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
105                                 continue
106                         }
107
108                         switch ev.TxMsg.MsgType {
109                         case protocol.MsgNewTx:
110                                 w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
111                         case protocol.MsgRemoveTx:
112                                 w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
113                         default:
114                                 log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
115                         }
116                 }
117         }
118 }
119
120 func (w *Wallet) checkWalletInfo() error {
121         if w.status.Version != currentVersion {
122                 return errWalletVersionMismatch
123         } else if !w.chain.BlockExist(&w.status.BestHash) {
124                 return errBestBlockNotFoundInCore
125         }
126
127         return nil
128 }
129
130 //loadWalletInfo return stored wallet info and nil,
131 //if error, return initial wallet info and err
132 func (w *Wallet) loadWalletInfo() error {
133         if rawWallet := w.store.GetWalletInfo(); rawWallet != nil {
134                 if err := json.Unmarshal(rawWallet, &w.status); err != nil {
135                         return err
136                 }
137
138                 err := w.checkWalletInfo()
139                 if err == nil {
140                         return nil
141                 }
142
143                 log.WithFields(log.Fields{"module": logModule}).Warn(err.Error())
144                 w.store.DeleteWalletTransactions()
145                 w.store.DeleteWalletUTXOs()
146         }
147
148         w.status.Version = currentVersion
149         w.status.WorkHash = bc.Hash{}
150         block, err := w.chain.GetBlockByHeight(0)
151         if err != nil {
152                 return err
153         }
154         return w.AttachBlock(block)
155 }
156
157 func (w *Wallet) commitWalletInfo() error {
158         rawWallet, err := json.Marshal(w.status)
159         if err != nil {
160                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save wallet info")
161                 return err
162         }
163
164         w.store.SetWalletInfo(rawWallet)
165         return nil
166 }
167
168 // AttachBlock attach a new block
169 func (w *Wallet) AttachBlock(block *types.Block) error {
170         w.rw.Lock()
171         defer w.rw.Unlock()
172
173         if block.PreviousBlockHash != w.status.WorkHash {
174                 log.Warn("wallet skip attachBlock due to status hash not equal to previous hash")
175                 return nil
176         }
177
178         blockHash := block.Hash()
179         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
180         if err != nil {
181                 return err
182         }
183
184         if err := w.RecoveryMgr.FilterRecoveryTxs(block); err != nil {
185                 log.WithField("err", err).Error("filter recovery txs")
186                 w.RecoveryMgr.finished()
187         }
188
189         annotatedTxs := w.filterAccountTxs(block, txStatus)
190         saveExternalAssetDefinition(block, w.store)
191         annotateTxsAccount(annotatedTxs, w.store)
192
193         w.store.InitBatch()
194         defer w.store.CommitBatch()
195
196         if err := w.indexTransactions(block, txStatus, annotatedTxs); err != nil {
197                 return err
198         }
199
200         w.attachUtxos(block, txStatus)
201         w.status.WorkHeight = block.Height
202         w.status.WorkHash = block.Hash()
203         if w.status.WorkHeight >= w.status.BestHeight {
204                 w.status.BestHeight = w.status.WorkHeight
205                 w.status.BestHash = w.status.WorkHash
206         }
207
208         return w.commitWalletInfo()
209 }
210
211 // DetachBlock detach a block and rollback state
212 func (w *Wallet) DetachBlock(block *types.Block) error {
213         w.rw.Lock()
214         defer w.rw.Unlock()
215
216         blockHash := block.Hash()
217         txStatus, err := w.chain.GetTransactionStatus(&blockHash)
218         if err != nil {
219                 return err
220         }
221
222         w.store.InitBatch()
223         defer w.store.CommitBatch()
224
225         w.detachUtxos(block, txStatus)
226         w.store.DeleteTransactions(w.status.BestHeight)
227
228         w.status.BestHeight = block.Height - 1
229         w.status.BestHash = block.PreviousBlockHash
230
231         if w.status.WorkHeight > w.status.BestHeight {
232                 w.status.WorkHeight = w.status.BestHeight
233                 w.status.WorkHash = w.status.BestHash
234         }
235
236         return w.commitWalletInfo()
237 }
238
239 //WalletUpdate process every valid block and reverse every invalid block which need to rollback
240 func (w *Wallet) walletUpdater() {
241         for {
242                 w.getRescanNotification()
243                 for !w.chain.InMainChain(w.status.BestHash) {
244                         block, err := w.chain.GetBlockByHash(&w.status.BestHash)
245                         if err != nil {
246                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater GetBlockByHash")
247                                 return
248                         }
249
250                         if err := w.DetachBlock(block); err != nil {
251                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater detachBlock stop")
252                                 return
253                         }
254                 }
255
256                 block, _ := w.chain.GetBlockByHeight(w.status.WorkHeight + 1)
257                 if block == nil {
258                         w.walletBlockWaiter()
259                         continue
260                 }
261
262                 if err := w.AttachBlock(block); err != nil {
263                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("walletUpdater AttachBlock stop")
264                         return
265                 }
266         }
267 }
268
269 //RescanBlocks provide a trigger to rescan blocks
270 func (w *Wallet) RescanBlocks() {
271         select {
272         case w.rescanCh <- struct{}{}:
273         default:
274                 return
275         }
276 }
277
278 // DeleteAccount deletes account matching accountID, then rescan wallet
279 func (w *Wallet) DeleteAccount(accountID string) (err error) {
280         w.rw.Lock()
281         defer w.rw.Unlock()
282
283         if err := w.AccountMgr.DeleteAccount(accountID); err != nil {
284                 return err
285         }
286
287         w.store.DeleteWalletTransactions()
288         w.RescanBlocks()
289         return nil
290 }
291
292 func (w *Wallet) UpdateAccountAlias(accountID string, newAlias string) (err error) {
293         w.rw.Lock()
294         defer w.rw.Unlock()
295
296         if err := w.AccountMgr.UpdateAccountAlias(accountID, newAlias); err != nil {
297                 return err
298         }
299
300         w.store.DeleteWalletTransactions()
301         w.RescanBlocks()
302         return nil
303 }
304
305 func (w *Wallet) getRescanNotification() {
306         select {
307         case <-w.rescanCh:
308                 w.setRescanStatus()
309         default:
310                 return
311         }
312 }
313
314 func (w *Wallet) setRescanStatus() {
315         block, _ := w.chain.GetBlockByHeight(0)
316         w.status.WorkHash = bc.Hash{}
317         w.AttachBlock(block)
318 }
319
320 func (w *Wallet) walletBlockWaiter() {
321         select {
322         case <-w.chain.BlockWaiter(w.status.WorkHeight + 1):
323         case <-w.rescanCh:
324                 w.setRescanStatus()
325         }
326 }
327
328 // GetWalletStatusInfo return current wallet StatusInfo
329 func (w *Wallet) GetWalletStatusInfo() StatusInfo {
330         w.rw.RLock()
331         defer w.rw.RUnlock()
332
333         return w.status
334 }