OSDN Git Service

524623ba7f60f4ed4c4825e3ce05b09b0f78d870
[bytom/vapor.git] / toolbar / reward / synchron / block_keeper.go
1 package synchron
2
3 import (
4         "encoding/hex"
5
6         "github.com/jinzhu/gorm"
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/errors"
10         "github.com/vapor/protocol/bc"
11         "github.com/vapor/protocol/bc/types"
12         "github.com/vapor/toolbar/common"
13         "github.com/vapor/toolbar/common/service"
14         "github.com/vapor/toolbar/reward/config"
15         "github.com/vapor/toolbar/reward/database/orm"
16 )
17
18 type ChainKeeper struct {
19         cfg        *config.Chain
20         db         *gorm.DB
21         node       *service.Node
22         syncHeight uint64
23 }
24
25 func NewChainKeeper(db *gorm.DB, cfg *config.Config, syncHeight uint64) (*ChainKeeper, error) {
26         keeper := &ChainKeeper{
27                 cfg:        &cfg.Chain,
28                 db:         db,
29                 node:       service.NewNode(cfg.Chain.Upstream),
30                 syncHeight: syncHeight,
31         }
32
33         blockState := &orm.BlockState{}
34         if err := db.First(blockState).Error; err == gorm.ErrRecordNotFound {
35                 blockStr, _, err := keeper.node.GetBlockByHeight(0)
36                 if err != nil {
37                         return nil, errors.Wrap(err, "Failed to get genenis block")
38                 }
39                 block := &types.Block{}
40                 if err := block.UnmarshalText([]byte(blockStr)); err != nil {
41                         return nil, errors.Wrap(err, "unmarshal block")
42                 }
43                 if err := keeper.initBlockState(db, block); err != nil {
44                         return nil, errors.Wrap(err, "Failed to insert blockState")
45                 }
46         } else if err != nil {
47                 return nil, errors.Wrap(err, "Failed to get blockState")
48         }
49
50         return keeper, nil
51 }
52
53 func (c *ChainKeeper) Start() error {
54         for {
55                 blockState := &orm.BlockState{}
56                 if c.db.First(blockState).RecordNotFound() {
57                         return errors.New("The query blockState record is empty empty on process block")
58                 }
59
60                 if blockState.Height >= c.syncHeight {
61                         break
62                 }
63
64                 if err := c.syncBlock(blockState); err != nil {
65                         return err
66                 }
67         }
68         return nil
69 }
70
71 func (c *ChainKeeper) syncBlock(blockState *orm.BlockState) error {
72         height, err := c.node.GetBlockCount()
73         if err != nil {
74                 return err
75         }
76
77         if height == blockState.Height {
78                 return nil
79         }
80
81         nextBlockStr, txStatus, err := c.node.GetBlockByHeight(blockState.Height + 1)
82         if err != nil {
83                 return err
84         }
85
86         nextBlock := &types.Block{}
87         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
88                 return errors.New("Unmarshal nextBlock")
89         }
90
91         // Normal case, the previous hash of next block equals to the hash of current block,
92         // just sync to database directly.
93         if nextBlock.PreviousBlockHash.String() == blockState.BlockHash {
94                 return c.AttachBlock(nextBlock, txStatus)
95         }
96
97         log.WithField("block height", blockState.Height).Debug("the prev hash of remote is not equals the hash of current best block, must rollback")
98         currentBlockStr, txStatus, err := c.node.GetBlockByHash(blockState.BlockHash)
99         if err != nil {
100                 return err
101         }
102
103         currentBlock := &types.Block{}
104         if err := nextBlock.UnmarshalText([]byte(currentBlockStr)); err != nil {
105                 return errors.New("Unmarshal currentBlock")
106         }
107
108         return c.DetachBlock(currentBlock, txStatus)
109 }
110
111 func (c *ChainKeeper) AttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
112         ormDB := c.db.Begin()
113         for pos, tx := range block.Transactions {
114                 statusFail, err := txStatus.GetStatus(pos)
115                 if err != nil {
116                         return err
117                 }
118
119                 if statusFail {
120                         log.WithFields(log.Fields{"block height": block.Height, "statusFail": statusFail}).Debug("AttachBlock")
121                         continue
122                 }
123
124                 for _, input := range tx.Inputs {
125                         vetoInput, ok := input.TypedInput.(*types.VetoInput)
126                         if !ok {
127                                 continue
128                         }
129
130                         outputID, err := input.SpentOutputID()
131                         if err != nil {
132                                 return err
133                         }
134                         utxo := &orm.Utxo{
135                                 VoterAddress: common.GetAddressFromControlProgram(vetoInput.ControlProgram),
136                                 OutputID:     outputID.String(),
137                         }
138                         // update data
139                         db := ormDB.Model(&orm.Utxo{}).Where(utxo).Update("veto_height", block.Height)
140                         if err := db.Error; err != nil {
141                                 ormDB.Rollback()
142                                 return err
143                         }
144
145                         if db.RowsAffected != 1 {
146                                 ormDB.Rollback()
147                                 return ErrInconsistentDB
148                         }
149
150                 }
151
152                 for index, output := range tx.Outputs {
153                         voteOutput, ok := output.TypedOutput.(*types.VoteOutput)
154                         if !ok {
155                                 continue
156                         }
157                         pubkey := hex.EncodeToString(voteOutput.Vote)
158                         outputID := tx.OutputID(index)
159                         utxo := &orm.Utxo{
160                                 Xpub:         pubkey,
161                                 VoterAddress: common.GetAddressFromControlProgram(voteOutput.ControlProgram),
162                                 VoteHeight:   block.Height,
163                                 VoteNum:      voteOutput.Amount,
164                                 VetoHeight:   0,
165                                 OutputID:     outputID.String(),
166                         }
167                         // insert data
168                         if err := ormDB.Save(utxo).Error; err != nil {
169                                 ormDB.Rollback()
170                                 return err
171                         }
172                 }
173         }
174
175         if err := c.updateBlockState(ormDB, block); err != nil {
176                 ormDB.Rollback()
177                 return err
178         }
179
180         return ormDB.Commit().Error
181 }
182
183 func (c *ChainKeeper) DetachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
184         ormDB := c.db.Begin()
185
186         utxo := &orm.Utxo{
187                 VoteHeight: block.Height,
188         }
189         // insert data
190         if err := ormDB.Where(utxo).Delete(&orm.Utxo{}).Error; err != nil {
191                 ormDB.Rollback()
192                 return err
193         }
194
195         utxo = &orm.Utxo{
196                 VetoHeight: block.Height,
197         }
198
199         // update data
200         if err := ormDB.Where(utxo).Update("veto_height", 0).Error; err != nil {
201                 ormDB.Rollback()
202                 return err
203         }
204
205         preBlockStr, _, err := c.node.GetBlockByHeight(block.Height + 1)
206         if err != nil {
207                 return err
208         }
209
210         preBlock := &types.Block{}
211         if err := preBlock.UnmarshalText([]byte(preBlockStr)); err != nil {
212                 return errors.New("Unmarshal preBlock")
213         }
214
215         if err := c.updateBlockState(ormDB, preBlock); err != nil {
216                 ormDB.Rollback()
217                 return err
218         }
219
220         return ormDB.Commit().Error
221 }
222
223 func (c *ChainKeeper) initBlockState(db *gorm.DB, block *types.Block) error {
224         blockHash := block.Hash()
225         blockState := &orm.BlockState{
226                 Height:    block.Height,
227                 BlockHash: blockHash.String(),
228         }
229
230         return db.Save(blockState).Error
231 }
232
233 func (c *ChainKeeper) updateBlockState(db *gorm.DB, block *types.Block) error {
234         // update blockState
235         blockHash := block.Hash()
236         blockState := &orm.BlockState{
237                 Height:    block.Height,
238                 BlockHash: blockHash.String(),
239         }
240
241         u := db.Model(&orm.BlockState{}).Updates(blockState)
242
243         if err := u.Error; err != nil {
244                 return err
245         }
246
247         if u.RowsAffected != 1 {
248                 return ErrInconsistentDB
249         }
250         return nil
251 }