OSDN Git Service

sync vote info for reward (#292)
authorwz <mars@bytom.io>
Tue, 16 Jul 2019 07:55:36 +0000 (15:55 +0800)
committerPaladz <yzhu101@uottawa.ca>
Tue, 16 Jul 2019 07:55:36 +0000 (15:55 +0800)
* modify CHARSET

* add

* add sync block

* add sync block

* fix

* fix

* fix review

* modify logic

toolbar/common/address.go [new file with mode: 0644]
toolbar/common/service/node.go [moved from toolbar/federation/service/node.go with 100% similarity]
toolbar/federation/synchron/mainchain_keeper.go
toolbar/federation/synchron/sidechain_keeper.go
toolbar/reward/config/config.go
toolbar/reward/database/dump_reward.sql
toolbar/reward/reward.go [new file with mode: 0644]
toolbar/reward/synchron/block_keeper.go [new file with mode: 0644]
toolbar/reward/synchron/errors.go [new file with mode: 0644]

diff --git a/toolbar/common/address.go b/toolbar/common/address.go
new file mode 100644 (file)
index 0000000..53b6c9b
--- /dev/null
@@ -0,0 +1,39 @@
+package common
+
+import (
+       "github.com/vapor/common"
+       "github.com/vapor/consensus"
+       "github.com/vapor/consensus/segwit"
+)
+
+func GetAddressFromControlProgram(prog []byte) string {
+       if segwit.IsP2WPKHScript(prog) {
+               if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
+                       return buildP2PKHAddress(pubHash)
+               }
+       } else if segwit.IsP2WSHScript(prog) {
+               if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
+                       return buildP2SHAddress(scriptHash)
+               }
+       }
+
+       return ""
+}
+
+func buildP2PKHAddress(pubHash []byte) string {
+       address, err := common.NewAddressWitnessPubKeyHash(pubHash, &consensus.ActiveNetParams)
+       if err != nil {
+               return ""
+       }
+
+       return address.EncodeAddress()
+}
+
+func buildP2SHAddress(scriptHash []byte) string {
+       address, err := common.NewAddressWitnessScriptHash(scriptHash, &consensus.ActiveNetParams)
+       if err != nil {
+               return ""
+       }
+
+       return address.EncodeAddress()
+}
index 31352a7..af90a7d 100644 (file)
@@ -16,12 +16,12 @@ import (
 
        "github.com/vapor/consensus"
        "github.com/vapor/errors"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/toolbar/common/service"
        "github.com/vapor/toolbar/federation/common"
        "github.com/vapor/toolbar/federation/config"
        "github.com/vapor/toolbar/federation/database"
        "github.com/vapor/toolbar/federation/database/orm"
-       "github.com/vapor/toolbar/federation/service"
-       "github.com/vapor/protocol/bc"
 )
 
 type mainchainKeeper struct {
index c17caf0..f60b4af 100644 (file)
@@ -11,13 +11,13 @@ import (
 
        "github.com/vapor/consensus"
        "github.com/vapor/errors"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+       "github.com/vapor/toolbar/common/service"
        "github.com/vapor/toolbar/federation/common"
        "github.com/vapor/toolbar/federation/config"
        "github.com/vapor/toolbar/federation/database"
        "github.com/vapor/toolbar/federation/database/orm"
-       "github.com/vapor/toolbar/federation/service"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/protocol/bc/types"
 )
 
 type sidechainKeeper struct {
index d912156..95ed77a 100644 (file)
@@ -1 +1,46 @@
 package config
+
+import (
+       "encoding/json"
+       "os"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/crypto/ed25519/chainkd"
+       "github.com/vapor/toolbar/common"
+)
+
+func NewConfig() *Config {
+       if len(os.Args) <= 1 {
+               log.Fatal("Please setup the config file path")
+       }
+
+       return NewConfigWithPath(os.Args[1])
+}
+
+func NewConfigWithPath(path string) *Config {
+       configFile, err := os.Open(path)
+       if err != nil {
+               log.WithFields(log.Fields{"err": err, "file_path": os.Args[1]}).Fatal("fail to open config file")
+       }
+       defer configFile.Close()
+
+       cfg := &Config{}
+       if err := json.NewDecoder(configFile).Decode(cfg); err != nil {
+               log.WithField("err", err).Fatal("fail to decode config file")
+       }
+
+       return cfg
+}
+
+type Config struct {
+       MySQLConfig common.MySQLConfig `json:"mysql"`
+       Chain       Chain              `json:"chain"`
+       XPubs       []chainkd.XPub     `json:"xpubs"`
+}
+
+type Chain struct {
+       Name        string `json:"name"`
+       Upstream    string `json:"upstream"`
+       SyncSeconds uint64 `json:"sync_seconds"`
+}
index 807c67d..5993867 100644 (file)
@@ -7,7 +7,7 @@ SET FOREIGN_KEY_CHECKS = 0;
 DROP TABLE IF EXISTS `block_state`;
 CREATE TABLE `block_state`  (
   `height` int(11) NOT NULL,
-  `block_hash` varchar(64) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL
+  `block_hash` varchar(64) NOT NULL
 ) ENGINE = InnoDB DEFAULT CHARSET=utf8;
 
 -- ----------------------------
@@ -16,12 +16,12 @@ CREATE TABLE `block_state`  (
 DROP TABLE IF EXISTS `vote`;
 CREATE TABLE `vote`  (
   `id` int(11) NOT NULL AUTO_INCREMENT,
-  `xpub` varchar(128) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
-  `voter_address` varchar(62) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
+  `xpub` varchar(128) NOT NULL,
+  `voter_address` varchar(62) NOT NULL,
   `vote_height` int(11) NOT NULL,
   `vote_num` int(11) NOT NULL,
   `veto_height` int(11) NOT NULL,
-  `output_id` varchar(64) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
+  `output_id` varchar(64) NOT NULL,
   PRIMARY KEY (`id`) USING BTREE,
   UNIQUE INDEX `xpub`(`xpub`, `vote_height`, `output_id`) USING BTREE
 ) ENGINE = InnoDB AUTO_INCREMENT = 6 DEFAULT CHARSET=utf8;
diff --git a/toolbar/reward/reward.go b/toolbar/reward/reward.go
new file mode 100644 (file)
index 0000000..4625b5b
--- /dev/null
@@ -0,0 +1,4 @@
+package reward
+
+type Reward interface {
+}
diff --git a/toolbar/reward/synchron/block_keeper.go b/toolbar/reward/synchron/block_keeper.go
new file mode 100644 (file)
index 0000000..6a3bda3
--- /dev/null
@@ -0,0 +1,185 @@
+package synchron
+
+import (
+       "encoding/hex"
+       "time"
+
+       "github.com/jinzhu/gorm"
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/errors"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+       "github.com/vapor/toolbar/common"
+       "github.com/vapor/toolbar/common/service"
+       "github.com/vapor/toolbar/reward/config"
+       "github.com/vapor/toolbar/reward/database/orm"
+)
+
+type ChainKeeper struct {
+       cfg  *config.Chain
+       db   *gorm.DB
+       node *service.Node
+}
+
+func NewChainKeeper(db *gorm.DB, cfg *config.Config) *ChainKeeper {
+       return &ChainKeeper{
+               cfg:  &cfg.Chain,
+               db:   db,
+               node: service.NewNode(cfg.Chain.Upstream),
+       }
+}
+
+func (c *ChainKeeper) Run() {
+       ticker := time.NewTicker(time.Duration(c.cfg.SyncSeconds) * time.Second)
+       for ; true; <-ticker.C {
+               for {
+                       isUpdate, err := c.syncBlock()
+                       if err != nil {
+                               log.WithField("error", err).Errorln("blockKeeper fail on process block")
+                               break
+                       }
+
+                       if !isUpdate {
+                               break
+                       }
+               }
+       }
+}
+
+func (c *ChainKeeper) syncBlock() (bool, error) {
+       blockState := &orm.BlockState{}
+       if err := c.db.First(blockState).Error; err != nil {
+               return false, errors.Wrap(err, "query chain")
+       }
+
+       height, err := c.node.GetBlockCount()
+       if err != nil {
+               return false, err
+       }
+
+       if height == blockState.Height {
+               return false, nil
+       }
+
+       nextBlockStr, txStatus, err := c.node.GetBlockByHeight(blockState.Height + 1)
+       if err != nil {
+               return false, err
+       }
+
+       nextBlock := &types.Block{}
+       if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
+               return false, errors.New("Unmarshal nextBlock")
+       }
+
+       // Normal case, the previous hash of next block equals to the hash of current block,
+       // just sync to database directly.
+       if nextBlock.PreviousBlockHash.String() == blockState.BlockHash {
+               return true, c.AttachBlock(nextBlock, txStatus)
+       }
+
+       log.WithField("block height", blockState.Height).Debug("the prev hash of remote is not equals the hash of current best block, must rollback")
+       currentBlockStr, txStatus, err := c.node.GetBlockByHash(blockState.BlockHash)
+       if err != nil {
+               return false, err
+       }
+
+       currentBlock := &types.Block{}
+       if err := nextBlock.UnmarshalText([]byte(currentBlockStr)); err != nil {
+               return false, errors.New("Unmarshal currentBlock")
+       }
+
+       return true, c.DetachBlock(currentBlock, txStatus)
+}
+
+func (c *ChainKeeper) AttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
+       ormDB := c.db.Begin()
+       for pos, tx := range block.Transactions {
+               statusFail, err := txStatus.GetStatus(pos)
+               if err != nil {
+                       return err
+               }
+
+               if statusFail {
+                       log.WithFields(log.Fields{"block height": block.Height, "statusFail": statusFail}).Debug("AttachBlock")
+                       continue
+               }
+
+               for _, input := range tx.Inputs {
+                       vetoInput, ok := input.TypedInput.(*types.VetoInput)
+                       if !ok {
+                               continue
+                       }
+
+                       outputID, err := input.SpentOutputID()
+                       if err != nil {
+                               return err
+                       }
+                       utxo := &orm.Utxo{
+                               VoterAddress: common.GetAddressFromControlProgram(vetoInput.ControlProgram),
+                               OutputID:     outputID.String(),
+                       }
+                       // update data
+                       db := ormDB.Where(utxo).Update("veto_height", block.Height)
+                       if err := db.Error; err != nil {
+                               ormDB.Rollback()
+                               return err
+                       }
+
+                       if db.RowsAffected != 1 {
+                               ormDB.Rollback()
+                               return ErrInconsistentDB
+                       }
+
+               }
+
+               for index, output := range tx.Outputs {
+                       voteOutput, ok := output.TypedOutput.(*types.VoteOutput)
+                       if !ok {
+                               continue
+                       }
+                       pubkey := hex.EncodeToString(voteOutput.Vote)
+                       outputID := tx.OutputID(index)
+                       utxo := &orm.Utxo{
+                               Xpub:         pubkey,
+                               VoterAddress: common.GetAddressFromControlProgram(voteOutput.ControlProgram),
+                               VoteHeight:   block.Height,
+                               VoteNum:      voteOutput.Amount,
+                               VetoHeight:   0,
+                               OutputID:     outputID.String(),
+                       }
+                       // insert data
+                       if err := ormDB.Save(utxo).Error; err != nil {
+                               ormDB.Rollback()
+                               return err
+                       }
+               }
+       }
+
+       return ormDB.Commit().Error
+}
+
+func (c *ChainKeeper) DetachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
+       ormDB := c.db.Begin()
+
+       utxo := &orm.Utxo{
+               VoteHeight: block.Height,
+       }
+       // insert data
+       if err := ormDB.Where(utxo).Delete(&orm.Utxo{}).Error; err != nil {
+               ormDB.Rollback()
+               return err
+       }
+
+       utxo = &orm.Utxo{
+               VetoHeight: block.Height,
+       }
+
+       // update data
+       if err := ormDB.Where(utxo).Update("veto_height", 0).Error; err != nil {
+               ormDB.Rollback()
+               return err
+       }
+
+       return ormDB.Commit().Error
+}
diff --git a/toolbar/reward/synchron/errors.go b/toolbar/reward/synchron/errors.go
new file mode 100644 (file)
index 0000000..5e58fde
--- /dev/null
@@ -0,0 +1,10 @@
+package synchron
+
+import (
+       "github.com/vapor/errors"
+)
+
+var (
+       ErrInconsistentDB = errors.New("inconsistent db status")
+       ErrOutputType     = errors.New("error output type")
+)