7 "github.com/jinzhu/gorm"
8 log "github.com/sirupsen/logrus"
10 "github.com/vapor/errors"
11 "github.com/vapor/protocol/bc"
12 "github.com/vapor/protocol/bc/types"
13 "github.com/vapor/toolbar/common"
14 "github.com/vapor/toolbar/common/service"
15 "github.com/vapor/toolbar/reward/config"
16 "github.com/vapor/toolbar/reward/database/orm"
19 type ChainKeeper struct {
25 func NewChainKeeper(db *gorm.DB, cfg *config.Config) *ChainKeeper {
29 node: service.NewNode(cfg.Chain.Upstream),
33 func (c *ChainKeeper) Run() {
34 ticker := time.NewTicker(time.Duration(c.cfg.SyncSeconds) * time.Second)
35 for ; true; <-ticker.C {
37 isUpdate, err := c.syncBlock()
39 log.WithField("error", err).Errorln("blockKeeper fail on process block")
50 func (c *ChainKeeper) syncBlock() (bool, error) {
51 blockState := &orm.BlockState{}
52 if err := c.db.First(blockState).Error; err != nil {
53 return false, errors.Wrap(err, "query chain")
56 height, err := c.node.GetBlockCount()
61 if height == blockState.Height {
65 nextBlockStr, txStatus, err := c.node.GetBlockByHeight(blockState.Height + 1)
70 nextBlock := &types.Block{}
71 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
72 return false, errors.New("Unmarshal nextBlock")
75 // Normal case, the previous hash of next block equals to the hash of current block,
76 // just sync to database directly.
77 if nextBlock.PreviousBlockHash.String() == blockState.BlockHash {
78 return true, c.AttachBlock(nextBlock, txStatus)
81 log.WithField("block height", blockState.Height).Debug("the prev hash of remote is not equals the hash of current best block, must rollback")
82 currentBlockStr, txStatus, err := c.node.GetBlockByHash(blockState.BlockHash)
87 currentBlock := &types.Block{}
88 if err := nextBlock.UnmarshalText([]byte(currentBlockStr)); err != nil {
89 return false, errors.New("Unmarshal currentBlock")
92 return true, c.DetachBlock(currentBlock, txStatus)
95 func (c *ChainKeeper) AttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
97 for pos, tx := range block.Transactions {
98 statusFail, err := txStatus.GetStatus(pos)
104 log.WithFields(log.Fields{"block height": block.Height, "statusFail": statusFail}).Debug("AttachBlock")
108 for _, input := range tx.Inputs {
109 vetoInput, ok := input.TypedInput.(*types.VetoInput)
114 outputID, err := input.SpentOutputID()
119 VoterAddress: common.GetAddressFromControlProgram(vetoInput.ControlProgram),
120 OutputID: outputID.String(),
123 db := ormDB.Where(utxo).Update("veto_height", block.Height)
124 if err := db.Error; err != nil {
129 if db.RowsAffected != 1 {
131 return ErrInconsistentDB
136 for index, output := range tx.Outputs {
137 voteOutput, ok := output.TypedOutput.(*types.VoteOutput)
141 pubkey := hex.EncodeToString(voteOutput.Vote)
142 outputID := tx.OutputID(index)
145 VoterAddress: common.GetAddressFromControlProgram(voteOutput.ControlProgram),
146 VoteHeight: block.Height,
147 VoteNum: voteOutput.Amount,
149 OutputID: outputID.String(),
152 if err := ormDB.Save(utxo).Error; err != nil {
159 return ormDB.Commit().Error
162 func (c *ChainKeeper) DetachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
163 ormDB := c.db.Begin()
166 VoteHeight: block.Height,
169 if err := ormDB.Where(utxo).Delete(&orm.Utxo{}).Error; err != nil {
175 VetoHeight: block.Height,
179 if err := ormDB.Where(utxo).Update("veto_height", 0).Error; err != nil {
184 return ormDB.Commit().Error