--- /dev/null
+package common
+
+import (
+ "github.com/vapor/common"
+ "github.com/vapor/consensus"
+ "github.com/vapor/consensus/segwit"
+)
+
+func GetAddressFromControlProgram(prog []byte) string {
+ if segwit.IsP2WPKHScript(prog) {
+ if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
+ return buildP2PKHAddress(pubHash)
+ }
+ } else if segwit.IsP2WSHScript(prog) {
+ if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
+ return buildP2SHAddress(scriptHash)
+ }
+ }
+
+ return ""
+}
+
+func buildP2PKHAddress(pubHash []byte) string {
+ address, err := common.NewAddressWitnessPubKeyHash(pubHash, &consensus.ActiveNetParams)
+ if err != nil {
+ return ""
+ }
+
+ return address.EncodeAddress()
+}
+
+func buildP2SHAddress(scriptHash []byte) string {
+ address, err := common.NewAddressWitnessScriptHash(scriptHash, &consensus.ActiveNetParams)
+ if err != nil {
+ return ""
+ }
+
+ return address.EncodeAddress()
+}
package config
+
+import (
+ "encoding/json"
+ "os"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/crypto/ed25519/chainkd"
+ "github.com/vapor/toolbar/common"
+)
+
+func NewConfig() *Config {
+ if len(os.Args) <= 1 {
+ log.Fatal("Please setup the config file path")
+ }
+
+ return NewConfigWithPath(os.Args[1])
+}
+
+func NewConfigWithPath(path string) *Config {
+ configFile, err := os.Open(path)
+ if err != nil {
+ log.WithFields(log.Fields{"err": err, "file_path": os.Args[1]}).Fatal("fail to open config file")
+ }
+ defer configFile.Close()
+
+ cfg := &Config{}
+ if err := json.NewDecoder(configFile).Decode(cfg); err != nil {
+ log.WithField("err", err).Fatal("fail to decode config file")
+ }
+
+ return cfg
+}
+
+type Config struct {
+ MySQLConfig common.MySQLConfig `json:"mysql"`
+ Chain Chain `json:"chain"`
+ XPubs []chainkd.XPub `json:"xpubs"`
+}
+
+type Chain struct {
+ Name string `json:"name"`
+ Upstream string `json:"upstream"`
+ SyncSeconds uint64 `json:"sync_seconds"`
+}
DROP TABLE IF EXISTS `block_state`;
CREATE TABLE `block_state` (
`height` int(11) NOT NULL,
- `block_hash` varchar(64) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL
+ `block_hash` varchar(64) NOT NULL
) ENGINE = InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
DROP TABLE IF EXISTS `vote`;
CREATE TABLE `vote` (
`id` int(11) NOT NULL AUTO_INCREMENT,
- `xpub` varchar(128) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
- `voter_address` varchar(62) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
+ `xpub` varchar(128) NOT NULL,
+ `voter_address` varchar(62) NOT NULL,
`vote_height` int(11) NOT NULL,
`vote_num` int(11) NOT NULL,
`veto_height` int(11) NOT NULL,
- `output_id` varchar(64) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
+ `output_id` varchar(64) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `xpub`(`xpub`, `vote_height`, `output_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 DEFAULT CHARSET=utf8;
--- /dev/null
+package synchron
+
+import (
+ "encoding/hex"
+ "time"
+
+ "github.com/jinzhu/gorm"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/errors"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+ "github.com/vapor/toolbar/common"
+ "github.com/vapor/toolbar/common/service"
+ "github.com/vapor/toolbar/reward/config"
+ "github.com/vapor/toolbar/reward/database/orm"
+)
+
+type ChainKeeper struct {
+ cfg *config.Chain
+ db *gorm.DB
+ node *service.Node
+}
+
+func NewChainKeeper(db *gorm.DB, cfg *config.Config) *ChainKeeper {
+ return &ChainKeeper{
+ cfg: &cfg.Chain,
+ db: db,
+ node: service.NewNode(cfg.Chain.Upstream),
+ }
+}
+
+func (c *ChainKeeper) Run() {
+ ticker := time.NewTicker(time.Duration(c.cfg.SyncSeconds) * time.Second)
+ for ; true; <-ticker.C {
+ for {
+ isUpdate, err := c.syncBlock()
+ if err != nil {
+ log.WithField("error", err).Errorln("blockKeeper fail on process block")
+ break
+ }
+
+ if !isUpdate {
+ break
+ }
+ }
+ }
+}
+
+func (c *ChainKeeper) syncBlock() (bool, error) {
+ blockState := &orm.BlockState{}
+ if err := c.db.First(blockState).Error; err != nil {
+ return false, errors.Wrap(err, "query chain")
+ }
+
+ height, err := c.node.GetBlockCount()
+ if err != nil {
+ return false, err
+ }
+
+ if height == blockState.Height {
+ return false, nil
+ }
+
+ nextBlockStr, txStatus, err := c.node.GetBlockByHeight(blockState.Height + 1)
+ if err != nil {
+ return false, err
+ }
+
+ nextBlock := &types.Block{}
+ if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
+ return false, errors.New("Unmarshal nextBlock")
+ }
+
+ // Normal case, the previous hash of next block equals to the hash of current block,
+ // just sync to database directly.
+ if nextBlock.PreviousBlockHash.String() == blockState.BlockHash {
+ return true, c.AttachBlock(nextBlock, txStatus)
+ }
+
+ log.WithField("block height", blockState.Height).Debug("the prev hash of remote is not equals the hash of current best block, must rollback")
+ currentBlockStr, txStatus, err := c.node.GetBlockByHash(blockState.BlockHash)
+ if err != nil {
+ return false, err
+ }
+
+ currentBlock := &types.Block{}
+ if err := nextBlock.UnmarshalText([]byte(currentBlockStr)); err != nil {
+ return false, errors.New("Unmarshal currentBlock")
+ }
+
+ return true, c.DetachBlock(currentBlock, txStatus)
+}
+
+func (c *ChainKeeper) AttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
+ ormDB := c.db.Begin()
+ for pos, tx := range block.Transactions {
+ statusFail, err := txStatus.GetStatus(pos)
+ if err != nil {
+ return err
+ }
+
+ if statusFail {
+ log.WithFields(log.Fields{"block height": block.Height, "statusFail": statusFail}).Debug("AttachBlock")
+ continue
+ }
+
+ for _, input := range tx.Inputs {
+ vetoInput, ok := input.TypedInput.(*types.VetoInput)
+ if !ok {
+ continue
+ }
+
+ outputID, err := input.SpentOutputID()
+ if err != nil {
+ return err
+ }
+ utxo := &orm.Utxo{
+ VoterAddress: common.GetAddressFromControlProgram(vetoInput.ControlProgram),
+ OutputID: outputID.String(),
+ }
+ // update data
+ db := ormDB.Where(utxo).Update("veto_height", block.Height)
+ if err := db.Error; err != nil {
+ ormDB.Rollback()
+ return err
+ }
+
+ if db.RowsAffected != 1 {
+ ormDB.Rollback()
+ return ErrInconsistentDB
+ }
+
+ }
+
+ for index, output := range tx.Outputs {
+ voteOutput, ok := output.TypedOutput.(*types.VoteOutput)
+ if !ok {
+ continue
+ }
+ pubkey := hex.EncodeToString(voteOutput.Vote)
+ outputID := tx.OutputID(index)
+ utxo := &orm.Utxo{
+ Xpub: pubkey,
+ VoterAddress: common.GetAddressFromControlProgram(voteOutput.ControlProgram),
+ VoteHeight: block.Height,
+ VoteNum: voteOutput.Amount,
+ VetoHeight: 0,
+ OutputID: outputID.String(),
+ }
+ // insert data
+ if err := ormDB.Save(utxo).Error; err != nil {
+ ormDB.Rollback()
+ return err
+ }
+ }
+ }
+
+ return ormDB.Commit().Error
+}
+
+func (c *ChainKeeper) DetachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
+ ormDB := c.db.Begin()
+
+ utxo := &orm.Utxo{
+ VoteHeight: block.Height,
+ }
+ // insert data
+ if err := ormDB.Where(utxo).Delete(&orm.Utxo{}).Error; err != nil {
+ ormDB.Rollback()
+ return err
+ }
+
+ utxo = &orm.Utxo{
+ VetoHeight: block.Height,
+ }
+
+ // update data
+ if err := ormDB.Where(utxo).Update("veto_height", 0).Error; err != nil {
+ ormDB.Rollback()
+ return err
+ }
+
+ return ormDB.Commit().Error
+}