OSDN Git Service

Mov late node sync test (#529)
[bytom/vapor.git] / application / mov / database / mov_store.go
1 package database
2
3 import (
4         "encoding/binary"
5         "encoding/json"
6         "errors"
7         "math"
8
9         "github.com/bytom/vapor/application/mov/common"
10         dbm "github.com/bytom/vapor/database/leveldb"
11         "github.com/bytom/vapor/protocol/bc"
12         "github.com/bytom/vapor/protocol/bc/types"
13 )
14
15 // ErrNotInitDBState represent the database state of mov store is not initialized
16 var ErrNotInitDBState = errors.New("database state of mov store is not initialized")
17
18 // MovStore is the interface for mov's persistent storage
19 type MovStore interface {
20         Clear()
21         GetMovDatabaseState() (*common.MovDatabaseState, error)
22         InitDBState(height uint64, hash *bc.Hash) error
23         ListOrders(orderAfter *common.Order) ([]*common.Order, error)
24         ListTradePairsWithStart(fromAssetIDAfter, toAssetIDAfter *bc.AssetID) ([]*common.TradePair, error)
25         ProcessOrders(addOrders []*common.Order, delOrders []*common.Order, blockHeader *types.BlockHeader) error
26 }
27
28 const (
29         order byte = iota + 1
30         tradePair
31         matchStatus
32
33         fromAssetIDPos = 0
34         toAssetIDPos   = 1
35         assetIDLen     = 32
36         rateByteLen    = 8
37
38         tradePairsNum = 32
39         ordersNum     = 128
40 )
41
42 var (
43         movStore         = []byte("MOV:")
44         ordersPrefix     = append(movStore, order)
45         tradePairsPrefix = append(movStore, tradePair)
46         bestMatchStore   = append(movStore, matchStatus)
47 )
48
49 type orderData struct {
50         Utxo             *common.MovUtxo
51         RatioNumerator   int64
52         RatioDenominator int64
53 }
54
55 func calcOrderKey(fromAssetID, toAssetID *bc.AssetID, utxoHash *bc.Hash, rate float64) []byte {
56         buf := make([]byte, 8)
57         binary.BigEndian.PutUint64(buf, math.Float64bits(rate))
58         key := append(ordersPrefix, fromAssetID.Bytes()...)
59         key = append(key, toAssetID.Bytes()...)
60         key = append(key, buf...)
61         return append(key, utxoHash.Bytes()...)
62 }
63
64 func calcTradePairKey(fromAssetID, toAssetID *bc.AssetID) []byte {
65         key := append(tradePairsPrefix, fromAssetID.Bytes()...)
66         return append(key, toAssetID.Bytes()...)
67 }
68
69 func getAssetIDFromTradePairKey(key []byte, posIndex int) *bc.AssetID {
70         b := [32]byte{}
71         pos := len(tradePairsPrefix) + assetIDLen*posIndex
72         copy(b[:], key[pos:pos+assetIDLen])
73         assetID := bc.NewAssetID(b)
74         return &assetID
75 }
76
77 func getRateFromOrderKey(key []byte) float64 {
78         ratePos := len(ordersPrefix) + assetIDLen*2
79         return math.Float64frombits(binary.BigEndian.Uint64(key[ratePos : ratePos+rateByteLen]))
80 }
81
82 type tradePairData struct {
83         Count int
84 }
85
86 // LevelDBMovStore is the LevelDB implementation for MovStore
87 type LevelDBMovStore struct {
88         db dbm.DB
89 }
90
91 // NewLevelDBMovStore create a new LevelDBMovStore object
92 func NewLevelDBMovStore(db dbm.DB) *LevelDBMovStore {
93         return &LevelDBMovStore{db: db}
94 }
95
96 // Clear will clear all the data of store
97 func (m *LevelDBMovStore) Clear() {
98         batch := m.db.NewBatch()
99
100         iter := m.db.Iterator()
101         defer iter.Release()
102
103         for iter.Next() {
104                 batch.Delete(iter.Key())
105         }
106         batch.Write()
107 }
108
109 // GetMovDatabaseState return the current DB's image status
110 func (m *LevelDBMovStore) GetMovDatabaseState() (*common.MovDatabaseState, error) {
111         if value := m.db.Get(bestMatchStore); value != nil {
112                 state := &common.MovDatabaseState{}
113                 return state, json.Unmarshal(value, state)
114         }
115
116         return nil, ErrNotInitDBState
117 }
118
119 // InitDBState set the DB's image status
120 func (m *LevelDBMovStore) InitDBState(height uint64, hash *bc.Hash) error {
121         state := &common.MovDatabaseState{Height: height, Hash: hash}
122         value, err := json.Marshal(state)
123         if err != nil {
124                 return err
125         }
126
127         m.db.Set(bestMatchStore, value)
128         return nil
129 }
130
131 // ListOrders return n orders after the input order
132 func (m *LevelDBMovStore) ListOrders(orderAfter *common.Order) ([]*common.Order, error) {
133         if orderAfter.FromAssetID == nil || orderAfter.ToAssetID == nil {
134                 return nil, errors.New("assetID is nil")
135         }
136
137         orderPrefix := append(ordersPrefix, orderAfter.FromAssetID.Bytes()...)
138         orderPrefix = append(orderPrefix, orderAfter.ToAssetID.Bytes()...)
139
140         var startKey []byte
141         if orderAfter.Rate() > 0 {
142                 startKey = calcOrderKey(orderAfter.FromAssetID, orderAfter.ToAssetID, orderAfter.UTXOHash(), orderAfter.Rate())
143         }
144
145         itr := m.db.IteratorPrefixWithStart(orderPrefix, startKey, false)
146         defer itr.Release()
147
148         var orders []*common.Order
149         for txNum := 0; txNum < ordersNum && itr.Next(); txNum++ {
150                 orderData := &orderData{}
151                 if err := json.Unmarshal(itr.Value(), orderData); err != nil {
152                         return nil, err
153                 }
154
155                 orders = append(orders, &common.Order{
156                         FromAssetID:      orderAfter.FromAssetID,
157                         ToAssetID:        orderAfter.ToAssetID,
158                         Utxo:             orderData.Utxo,
159                         RatioNumerator:   orderData.RatioNumerator,
160                         RatioDenominator: orderData.RatioDenominator,
161                 })
162         }
163         return orders, nil
164 }
165
166 // ListTradePairsWithStart return n trade pairs after the input trade pair
167 func (m *LevelDBMovStore) ListTradePairsWithStart(fromAssetIDAfter, toAssetIDAfter *bc.AssetID) ([]*common.TradePair, error) {
168         var startKey []byte
169         if fromAssetIDAfter != nil && toAssetIDAfter != nil {
170                 startKey = calcTradePairKey(fromAssetIDAfter, toAssetIDAfter)
171         }
172
173         itr := m.db.IteratorPrefixWithStart(tradePairsPrefix, startKey, false)
174         defer itr.Release()
175
176         var tradePairs []*common.TradePair
177         for txNum := 0; txNum < tradePairsNum && itr.Next(); txNum++ {
178                 key := itr.Key()
179                 fromAssetID := getAssetIDFromTradePairKey(key, fromAssetIDPos)
180                 toAssetID := getAssetIDFromTradePairKey(key, toAssetIDPos)
181
182                 tradePairData := &tradePairData{}
183                 if err := json.Unmarshal(itr.Value(), tradePairData); err != nil {
184                         return nil, err
185                 }
186
187                 tradePairs = append(tradePairs, &common.TradePair{FromAssetID: fromAssetID, ToAssetID: toAssetID, Count: tradePairData.Count})
188         }
189         return tradePairs, nil
190 }
191
192 // ProcessOrders update the DB's image by add new orders, delete the used order
193 func (m *LevelDBMovStore) ProcessOrders(addOrders []*common.Order, delOrders []*common.Order, blockHeader *types.BlockHeader) error {
194         if err := m.checkMovDatabaseState(blockHeader); err != nil {
195                 return err
196         }
197
198         batch := m.db.NewBatch()
199         tradePairsCnt := make(map[string]*common.TradePair)
200         if err := m.addOrders(batch, addOrders, tradePairsCnt); err != nil {
201                 return err
202         }
203
204         m.deleteOrders(batch, delOrders, tradePairsCnt)
205         if err := m.updateTradePairs(batch, tradePairsCnt); err != nil {
206                 return err
207         }
208
209         state, err := m.calcNextDatabaseState(blockHeader)
210         if err != nil {
211                 return err
212         }
213
214         if err := m.saveMovDatabaseState(batch, state); err != nil {
215                 return err
216         }
217
218         batch.Write()
219         return nil
220 }
221
222 func (m *LevelDBMovStore) addOrders(batch dbm.Batch, orders []*common.Order, tradePairsCnt map[string]*common.TradePair) error {
223         for _, order := range orders {
224                 orderData := &orderData{
225                         Utxo:             order.Utxo,
226                         RatioNumerator:   order.RatioNumerator,
227                         RatioDenominator: order.RatioDenominator,
228                 }
229                 data, err := json.Marshal(orderData)
230                 if err != nil {
231                         return err
232                 }
233
234                 key := calcOrderKey(order.FromAssetID, order.ToAssetID, order.UTXOHash(), order.Rate())
235                 batch.Set(key, data)
236
237                 tradePair := &common.TradePair{
238                         FromAssetID: order.FromAssetID,
239                         ToAssetID:   order.ToAssetID,
240                 }
241                 if _, ok := tradePairsCnt[tradePair.Key()]; !ok {
242                         tradePairsCnt[tradePair.Key()] = tradePair
243                 }
244                 tradePairsCnt[tradePair.Key()].Count++
245         }
246         return nil
247 }
248
249 func (m *LevelDBMovStore) calcNextDatabaseState(blockHeader *types.BlockHeader) (*common.MovDatabaseState, error) {
250         hash := blockHeader.Hash()
251         height := blockHeader.Height
252
253         state, err := m.GetMovDatabaseState()
254         if err != nil {
255                 return nil, err
256         }
257
258         if *state.Hash == hash {
259                 hash = blockHeader.PreviousBlockHash
260                 height = blockHeader.Height - 1
261         }
262
263         return &common.MovDatabaseState{Height: height, Hash: &hash}, nil
264 }
265
266 func (m *LevelDBMovStore) checkMovDatabaseState(header *types.BlockHeader) error {
267         state, err := m.GetMovDatabaseState()
268         if err != nil {
269                 return err
270         }
271
272         if (*state.Hash == header.PreviousBlockHash && (state.Height+1) == header.Height) || *state.Hash == header.Hash() {
273                 return nil
274         }
275
276         return errors.New("the status of the block is inconsistent with that of mov-database")
277 }
278
279 func (m *LevelDBMovStore) deleteOrders(batch dbm.Batch, orders []*common.Order, tradePairsCnt map[string]*common.TradePair) {
280         for _, order := range orders {
281                 key := calcOrderKey(order.FromAssetID, order.ToAssetID, order.UTXOHash(), order.Rate())
282                 batch.Delete(key)
283
284                 tradePair := &common.TradePair{
285                         FromAssetID: order.FromAssetID,
286                         ToAssetID:   order.ToAssetID,
287                 }
288                 if _, ok := tradePairsCnt[tradePair.Key()]; !ok {
289                         tradePairsCnt[tradePair.Key()] = tradePair
290                 }
291                 tradePairsCnt[tradePair.Key()].Count--
292         }
293 }
294
295 func (m *LevelDBMovStore) saveMovDatabaseState(batch dbm.Batch, state *common.MovDatabaseState) error {
296         value, err := json.Marshal(state)
297         if err != nil {
298                 return err
299         }
300
301         batch.Set(bestMatchStore, value)
302         return nil
303 }
304
305 func (m *LevelDBMovStore) updateTradePairs(batch dbm.Batch, tradePairs map[string]*common.TradePair) error {
306         for _, v := range tradePairs {
307                 key := calcTradePairKey(v.FromAssetID, v.ToAssetID)
308                 tradePairData := &tradePairData{}
309                 if value := m.db.Get(key); value != nil {
310                         if err := json.Unmarshal(value, tradePairData); err != nil {
311                                 return err
312                         }
313                 }
314
315                 if tradePairData.Count += v.Count; tradePairData.Count < 0 {
316                         return errors.New("negative trade count")
317                 }
318
319                 if tradePairData.Count > 0 {
320                         value, err := json.Marshal(tradePairData)
321                         if err != nil {
322                                 return err
323                         }
324
325                         batch.Set(key, value)
326                 } else {
327                         batch.Delete(key)
328                 }
329         }
330         return nil
331 }