OSDN Git Service

6a3bda38743821cd18a5f5724c84cd778b81551f
[bytom/vapor.git] / toolbar / reward / synchron / block_keeper.go
1 package synchron
2
3 import (
4         "encoding/hex"
5         "time"
6
7         "github.com/jinzhu/gorm"
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/errors"
11         "github.com/vapor/protocol/bc"
12         "github.com/vapor/protocol/bc/types"
13         "github.com/vapor/toolbar/common"
14         "github.com/vapor/toolbar/common/service"
15         "github.com/vapor/toolbar/reward/config"
16         "github.com/vapor/toolbar/reward/database/orm"
17 )
18
19 type ChainKeeper struct {
20         cfg  *config.Chain
21         db   *gorm.DB
22         node *service.Node
23 }
24
25 func NewChainKeeper(db *gorm.DB, cfg *config.Config) *ChainKeeper {
26         return &ChainKeeper{
27                 cfg:  &cfg.Chain,
28                 db:   db,
29                 node: service.NewNode(cfg.Chain.Upstream),
30         }
31 }
32
33 func (c *ChainKeeper) Run() {
34         ticker := time.NewTicker(time.Duration(c.cfg.SyncSeconds) * time.Second)
35         for ; true; <-ticker.C {
36                 for {
37                         isUpdate, err := c.syncBlock()
38                         if err != nil {
39                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
40                                 break
41                         }
42
43                         if !isUpdate {
44                                 break
45                         }
46                 }
47         }
48 }
49
50 func (c *ChainKeeper) syncBlock() (bool, error) {
51         blockState := &orm.BlockState{}
52         if err := c.db.First(blockState).Error; err != nil {
53                 return false, errors.Wrap(err, "query chain")
54         }
55
56         height, err := c.node.GetBlockCount()
57         if err != nil {
58                 return false, err
59         }
60
61         if height == blockState.Height {
62                 return false, nil
63         }
64
65         nextBlockStr, txStatus, err := c.node.GetBlockByHeight(blockState.Height + 1)
66         if err != nil {
67                 return false, err
68         }
69
70         nextBlock := &types.Block{}
71         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
72                 return false, errors.New("Unmarshal nextBlock")
73         }
74
75         // Normal case, the previous hash of next block equals to the hash of current block,
76         // just sync to database directly.
77         if nextBlock.PreviousBlockHash.String() == blockState.BlockHash {
78                 return true, c.AttachBlock(nextBlock, txStatus)
79         }
80
81         log.WithField("block height", blockState.Height).Debug("the prev hash of remote is not equals the hash of current best block, must rollback")
82         currentBlockStr, txStatus, err := c.node.GetBlockByHash(blockState.BlockHash)
83         if err != nil {
84                 return false, err
85         }
86
87         currentBlock := &types.Block{}
88         if err := nextBlock.UnmarshalText([]byte(currentBlockStr)); err != nil {
89                 return false, errors.New("Unmarshal currentBlock")
90         }
91
92         return true, c.DetachBlock(currentBlock, txStatus)
93 }
94
95 func (c *ChainKeeper) AttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
96         ormDB := c.db.Begin()
97         for pos, tx := range block.Transactions {
98                 statusFail, err := txStatus.GetStatus(pos)
99                 if err != nil {
100                         return err
101                 }
102
103                 if statusFail {
104                         log.WithFields(log.Fields{"block height": block.Height, "statusFail": statusFail}).Debug("AttachBlock")
105                         continue
106                 }
107
108                 for _, input := range tx.Inputs {
109                         vetoInput, ok := input.TypedInput.(*types.VetoInput)
110                         if !ok {
111                                 continue
112                         }
113
114                         outputID, err := input.SpentOutputID()
115                         if err != nil {
116                                 return err
117                         }
118                         utxo := &orm.Utxo{
119                                 VoterAddress: common.GetAddressFromControlProgram(vetoInput.ControlProgram),
120                                 OutputID:     outputID.String(),
121                         }
122                         // update data
123                         db := ormDB.Where(utxo).Update("veto_height", block.Height)
124                         if err := db.Error; err != nil {
125                                 ormDB.Rollback()
126                                 return err
127                         }
128
129                         if db.RowsAffected != 1 {
130                                 ormDB.Rollback()
131                                 return ErrInconsistentDB
132                         }
133
134                 }
135
136                 for index, output := range tx.Outputs {
137                         voteOutput, ok := output.TypedOutput.(*types.VoteOutput)
138                         if !ok {
139                                 continue
140                         }
141                         pubkey := hex.EncodeToString(voteOutput.Vote)
142                         outputID := tx.OutputID(index)
143                         utxo := &orm.Utxo{
144                                 Xpub:         pubkey,
145                                 VoterAddress: common.GetAddressFromControlProgram(voteOutput.ControlProgram),
146                                 VoteHeight:   block.Height,
147                                 VoteNum:      voteOutput.Amount,
148                                 VetoHeight:   0,
149                                 OutputID:     outputID.String(),
150                         }
151                         // insert data
152                         if err := ormDB.Save(utxo).Error; err != nil {
153                                 ormDB.Rollback()
154                                 return err
155                         }
156                 }
157         }
158
159         return ormDB.Commit().Error
160 }
161
162 func (c *ChainKeeper) DetachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
163         ormDB := c.db.Begin()
164
165         utxo := &orm.Utxo{
166                 VoteHeight: block.Height,
167         }
168         // insert data
169         if err := ormDB.Where(utxo).Delete(&orm.Utxo{}).Error; err != nil {
170                 ormDB.Rollback()
171                 return err
172         }
173
174         utxo = &orm.Utxo{
175                 VetoHeight: block.Height,
176         }
177
178         // update data
179         if err := ormDB.Where(utxo).Update("veto_height", 0).Error; err != nil {
180                 ormDB.Rollback()
181                 return err
182         }
183
184         return ormDB.Commit().Error
185 }