8 btmTypes "github.com/bytom/protocol/bc/types"
9 "github.com/jinzhu/gorm"
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/errors"
13 "github.com/vapor/federation/config"
14 "github.com/vapor/federation/database"
15 "github.com/vapor/federation/database/orm"
16 "github.com/vapor/federation/service"
17 "github.com/vapor/protocol/bc"
20 type mainchainKeeper struct {
25 assetCache *database.AssetCache
28 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
29 return &mainchainKeeper{
32 node: service.NewNode(chainCfg.Upstream),
33 chainName: chainCfg.Name,
34 assetCache: database.NewAssetCache(),
38 func (m *mainchainKeeper) Run() {
39 ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
40 for ; true; <-ticker.C {
42 isUpdate, err := m.syncBlock()
44 log.WithField("error", err).Errorln("blockKeeper fail on process block")
55 func (m *mainchainKeeper) syncBlock() (bool, error) {
56 chain := &orm.Chain{Name: m.chainName}
57 if err := m.db.Where(chain).First(chain).Error; err != nil {
58 return false, errors.Wrap(err, "query chain")
61 height, err := m.node.GetBlockCount()
66 if height <= chain.BlockHeight+m.cfg.Confirmations {
70 nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
75 nextBlock := &btmTypes.Block{}
76 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
77 return false, errors.New("Unmarshal nextBlock")
80 if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
81 log.WithFields(log.Fields{
82 "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
83 "db block_hash": chain.BlockHash,
84 }).Fatal("BlockHash mismatch")
85 return false, ErrInconsistentDB
88 if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
95 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
96 blockHash := block.Hash()
97 log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
99 if err := m.processBlock(chain, block); err != nil {
104 return m.db.Commit().Error
107 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *btmTypes.Block) error {
108 if err := m.processIssuing(block.Transactions); err != nil {
112 for i, tx := range block.Transactions {
113 if m.isDepositTx(tx) {
114 if err := m.processDepositTx(uint64(i), tx); err != nil {
119 if m.isWithdrawalTx(tx) {
120 if err := m.processWithdrawalTx(uint64(i), tx); err != nil {
126 return m.processChainInfo(chain, block)
129 func (m *mainchainKeeper) isDepositTx(tx *btmTypes.Tx) bool {
130 for _, output := range tx.Outputs {
131 if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) {
138 func (m *mainchainKeeper) isWithdrawalTx(tx *btmTypes.Tx) bool {
139 for _, input := range tx.Inputs {
140 if bytes.Equal(input.ControlProgram(), fedProg) {
147 func (m *mainchainKeeper) processDepositTx(txIndex uint64, tx *btmTypes.Tx) error {
151 func (m *mainchainKeeper) processWithdrawalTx(txIndex uint64, tx *btmTypes.Tx) error {
155 // TODO: maybe common
156 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *btmTypes.Block) error {
157 blockHash := block.Hash()
158 chain.BlockHash = blockHash.String()
159 chain.BlockHeight = block.Height
160 res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
161 if err := res.Error; err != nil {
165 if res.RowsAffected != 1 {
166 return ErrInconsistentDB
172 func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
173 for _, tx := range txs {
174 for _, input := range tx.Inputs {
175 switch inp := input.TypedInput.(type) {
176 case *btmTypes.IssuanceInput:
177 assetID := inp.AssetID()
178 if _, err := m.getAsset(assetID.String()); err == nil {
183 AssetID: assetID.String(),
184 IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram),
185 VMVersion: inp.VMVersion,
186 RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
188 if err := m.db.Create(asset).Error; err != nil {
192 m.assetCache.Add(asset.AssetID, asset)
200 // TODO: maybe common
201 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
202 if asset := m.assetCache.Get(assetID); asset != nil {
206 asset := &orm.Asset{AssetID: assetID}
207 if err := m.db.Where(asset).First(asset).Error; err != nil {
208 return nil, errors.Wrap(err, "asset not found in memory and mysql")
211 m.assetCache.Add(assetID, asset)