OSDN Git Service

Dev db expandability (#44)
[bytom/vapor.git] / database / sql_store.go
1 package database
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/tmlibs/common"
8
9         dbm "github.com/vapor/database/db"
10         "github.com/vapor/database/orm"
11         "github.com/vapor/database/storage"
12         "github.com/vapor/errors"
13         "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16         "github.com/vapor/protocol/state"
17 )
18
19 const logModuleSQL = "SQLdb"
20
21 func loadBlockSQLStoreStateJSON(db dbm.SQLDB) *protocol.BlockStoreState {
22         bsj := orm.BlockStoreState{
23                 StoreKey: string(blockStoreKey),
24         }
25
26         SQLDB := db.Db()
27         if err := SQLDB.Where(&bsj).First(&bsj).Error; err != nil {
28                 return nil
29         }
30
31         hash := &bc.Hash{}
32         if err := hash.UnmarshalText([]byte(bsj.Hash)); err != nil {
33                 common.PanicCrisis(common.Fmt("Could not unmarshalText bytes: %s", bsj.Hash))
34         }
35
36         return &protocol.BlockStoreState{Height: bsj.Height, Hash: hash}
37 }
38
39 // A SQLStore encapsulates storage for blockchain validation.
40 // It satisfies the interface protocol.Store, and provides additional
41 // methods for querying current data.
42 type SQLStore struct {
43         db    dbm.SQLDB
44         cache blockCache
45 }
46
47 // GetBlockFromSQLDB return the block by given hash
48 func GetBlockFromSQLDB(db dbm.SQLDB, hash *bc.Hash) *types.Block {
49         blockHeader := &orm.BlockHeader{BlockHash: hash.String()}
50         if err := db.Db().Where(blockHeader).Find(blockHeader).Error; err != nil {
51                 return nil
52         }
53
54         txs := []*orm.Transaction{}
55         if err := db.Db().Where(&orm.Transaction{BlockHeaderID: blockHeader.ID}).Order("tx_index asc").Find(&txs).Error; err != nil {
56                 return nil
57         }
58
59         block, err := toBlock(blockHeader, txs)
60         if err != nil {
61                 return nil
62         }
63
64         return block
65 }
66
67 func toBlock(header *orm.BlockHeader, txs []*orm.Transaction) (*types.Block, error) {
68
69         blockHeader, err := header.ToTypesBlockHeader()
70         if err != nil {
71                 return nil, err
72         }
73
74         var transactions []*types.Tx
75
76         for _, tx := range txs {
77                 transaction, err := tx.UnmarshalText()
78                 if err != nil {
79                         return nil, err
80                 }
81                 transactions = append(transactions, transaction)
82         }
83
84         block := &types.Block{
85                 BlockHeader:  *blockHeader,
86                 Transactions: transactions,
87         }
88
89         return block, nil
90 }
91
92 // NewSQLStore creates and returns a new Store object.
93 func NewSQLStore(db dbm.SQLDB) *SQLStore {
94         cache := newBlockCache(func(hash *bc.Hash) *types.Block {
95                 return GetBlockFromSQLDB(db, hash)
96         })
97         return &SQLStore{
98                 db:    db,
99                 cache: cache,
100         }
101 }
102
103 // GetUtxo will search the utxo in db
104 func (s *SQLStore) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
105         return getUtxoFromSQLDB(s.db, hash)
106 }
107
108 // BlockExist check if the block is stored in disk
109 func (s *SQLStore) BlockExist(hash *bc.Hash) bool {
110         block, err := s.cache.lookup(hash)
111         return err == nil && block != nil
112 }
113
114 // GetBlock return the block by given hash
115 func (s *SQLStore) GetBlock(hash *bc.Hash) (*types.Block, error) {
116         return s.cache.lookup(hash)
117 }
118
119 // GetTransactionsUtxo will return all the utxo that related to the input txs
120 func (s *SQLStore) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
121         return getTransactionsUtxoFromSQLDB(s.db, view, txs)
122 }
123
124 // GetTransactionStatus will return the utxo that related to the block hash
125 func (s *SQLStore) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
126         ts := &bc.TransactionStatus{}
127         query := s.db.Db().Model(&orm.Transaction{}).Joins("join block_headers on block_headers.id = transactions.block_header_id").Where("block_headers.block_hash = ?", hash.String())
128         rows, err := query.Select("transactions.status_fail, block_headers.version").Order("transactions.tx_index asc").Rows()
129         if err != nil {
130                 return nil, err
131         }
132
133         for rows.Next() {
134                 var (
135                         statusFail bool
136                         version    uint64
137                 )
138                 if err := rows.Scan(&statusFail, &version); err != nil {
139                         return nil, err
140                 }
141
142                 ts.Version = version
143                 ts.VerifyStatus = append(ts.VerifyStatus, &bc.TxVerifyResult{StatusFail: statusFail})
144
145         }
146         return ts, nil
147 }
148
149 // GetStoreStatus return the BlockStoreStateJSON
150 func (s *SQLStore) GetStoreStatus() *protocol.BlockStoreState {
151         return loadBlockSQLStoreStateJSON(s.db)
152 }
153
154 func (s *SQLStore) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error) {
155         startTime := time.Now()
156         blockIndex := state.NewBlockIndex()
157
158         var lastNode *state.BlockNode
159         rows, err := s.db.Db().Model(&orm.BlockHeader{}).Order("height").Rows()
160         if err != nil {
161                 return nil, err
162         }
163         defer rows.Close()
164
165         for rows.Next() {
166                 header := orm.BlockHeader{}
167                 if err := rows.Scan(&header.ID, &header.BlockHash, &header.Height, &header.Version, &header.PreviousBlockHash, &header.Timestamp, &header.TransactionsMerkleRoot, &header.TransactionStatusHash); err != nil {
168                         return nil, err
169                 }
170                 if header.Height > stateBestHeight {
171                         break
172                 }
173
174                 typesBlockHeader, err := header.ToTypesBlockHeader()
175                 if err != nil {
176                         return nil, err
177                 }
178
179                 previousBlockHash := typesBlockHeader.PreviousBlockHash
180
181                 var parent *state.BlockNode
182                 if lastNode == nil || lastNode.Hash == previousBlockHash {
183                         parent = lastNode
184                 } else {
185                         parent = blockIndex.GetNode(&previousBlockHash)
186                 }
187
188                 node, err := state.NewBlockNode(typesBlockHeader, parent)
189                 if err != nil {
190                         return nil, err
191                 }
192
193                 blockIndex.AddNode(node)
194                 lastNode = node
195         }
196
197         log.WithFields(log.Fields{
198                 "module":   logModule,
199                 "height":   stateBestHeight,
200                 "duration": time.Since(startTime),
201         }).Debug("initialize load history block index from database")
202         return blockIndex, nil
203 }
204
205 // SaveBlock persists a new block in the protocol.
206 func (s *SQLStore) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error {
207         startTime := time.Now()
208
209         blockHash := block.Hash()
210         SQLDB := s.db.Db()
211         tx := SQLDB.Begin()
212
213         // Save block header details
214         blockHeader := &orm.BlockHeader{
215                 Height:                 block.Height,
216                 BlockHash:              blockHash.String(),
217                 Version:                block.Version,
218                 PreviousBlockHash:      block.PreviousBlockHash.String(),
219                 Timestamp:              block.Timestamp,
220                 TransactionsMerkleRoot: block.TransactionsMerkleRoot.String(),
221                 TransactionStatusHash:  block.TransactionStatusHash.String(),
222         }
223
224         if err := tx.Create(blockHeader).Error; err != nil {
225                 tx.Rollback()
226                 return err
227         }
228
229         // Save tx
230         for index, transaction := range block.Transactions {
231                 rawTx, err := transaction.MarshalText()
232                 if err != nil {
233                         return err
234                 }
235                 ormTransaction := &orm.Transaction{
236                         BlockHeaderID: blockHeader.ID,
237                         TxIndex:       uint64(index),
238                         RawData:       string(rawTx),
239                         StatusFail:    ts.VerifyStatus[index].StatusFail,
240                 }
241                 if err := tx.Create(ormTransaction).Error; err != nil {
242                         tx.Rollback()
243                         return err
244                 }
245         }
246
247         if err := tx.Commit().Error; err != nil {
248                 tx.Rollback()
249                 return errors.Wrap(err, "commit transaction")
250         }
251
252         log.WithFields(log.Fields{
253                 "module":   logModule,
254                 "height":   block.Height,
255                 "hash":     blockHash.String(),
256                 "duration": time.Since(startTime),
257         }).Info("block saved on disk")
258         return nil
259 }
260
261 // SaveChainStatus save the core's newest status && delete old status
262 func (s *SQLStore) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint) error {
263         SQLDB := s.db.Db()
264         tx := SQLDB.Begin()
265
266         if err := saveUtxoViewToSQLDB(tx, view); err != nil {
267                 tx.Rollback()
268                 return err
269         }
270
271         state := &orm.BlockStoreState{
272                 StoreKey: string(blockStoreKey),
273                 Height:   node.Height,
274                 Hash:     node.Hash.String(),
275         }
276
277         db := tx.Model(&orm.BlockStoreState{}).Update(state)
278
279         if err := db.Error; err != nil {
280                 tx.Rollback()
281                 return err
282         }
283
284         if db.RowsAffected == 0 {
285                 if err := tx.Save(state).Error; err != nil {
286                         tx.Rollback()
287                         return err
288                 }
289         }
290
291         if err := tx.Commit().Error; err != nil {
292                 tx.Rollback()
293                 return errors.Wrap(err, "commit transaction")
294         }
295
296         return nil
297 }
298
299 func (s *SQLStore) IsWithdrawSpent(hash *bc.Hash) bool {
300         data := &orm.ClaimTxState{
301                 TxHash: hash.String(),
302         }
303         count := 0
304         if err := s.db.Db().Where(data).First(data).Count(&count).Error; err != nil {
305                 return false
306         }
307
308         return count > 0
309 }
310
311 func (s *SQLStore) SetWithdrawSpent(hash *bc.Hash) error {
312         data := &orm.ClaimTxState{
313                 TxHash: hash.String(),
314         }
315         if err := s.db.Db().Create(data).Error; err != nil {
316                 return err
317         }
318         return nil
319 }