7 btmTypes "github.com/bytom/protocol/bc/types"
8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/errors"
12 "github.com/vapor/federation/config"
13 "github.com/vapor/federation/database"
14 "github.com/vapor/federation/database/orm"
15 "github.com/vapor/federation/service"
16 "github.com/vapor/protocol/bc"
19 type mainchainKeeper struct {
24 assetCache *database.AssetCache
27 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
28 return &mainchainKeeper{
31 node: service.NewNode(chainCfg.Upstream),
32 chainName: chainCfg.Name,
33 assetCache: database.NewAssetCache(),
37 func (m *mainchainKeeper) Run() {
38 ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
39 for ; true; <-ticker.C {
41 isUpdate, err := m.syncBlock()
43 log.WithField("error", err).Errorln("blockKeeper fail on process block")
54 func (m *mainchainKeeper) syncBlock() (bool, error) {
55 chain := &orm.Chain{Name: m.chainName}
56 if err := m.db.Where(chain).First(chain).Error; err != nil {
57 return false, errors.Wrap(err, "query chain")
60 height, err := m.node.GetBlockCount()
65 if height <= chain.BlockHeight+m.cfg.Confirmations {
69 nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
74 nextBlock := &btmTypes.Block{}
75 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
76 return false, errors.New("Unmarshal nextBlock")
79 if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
80 log.WithFields(log.Fields{
81 "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
82 "db block_hash": chain.BlockHash,
83 }).Fatal("BlockHash mismatch")
84 return false, errors.New("BlockHash mismatch")
87 if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
94 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
95 blockHash := block.Hash()
96 log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
98 if err := m.processBlock(block); err != nil {
103 return m.db.Commit().Error
106 func (m *mainchainKeeper) processBlock(block *btmTypes.Block) error {
107 if err := m.processIssuing(block.Transactions); err != nil {
114 func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
115 for _, tx := range txs {
116 for _, input := range tx.Inputs {
117 switch inp := input.TypedInput.(type) {
118 case *btmTypes.IssuanceInput:
119 assetID := inp.AssetID()
120 if _, err := m.getAsset(assetID.String()); err == nil {
125 AssetID: assetID.String(),
126 IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram),
127 VMVersion: inp.VMVersion,
128 RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
130 if err := m.db.Create(asset).Error; err != nil {
134 m.assetCache.Add(asset.AssetID, asset)
142 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
143 if asset := m.assetCache.Get(assetID); asset != nil {
147 asset := &orm.Asset{AssetID: assetID}
148 if err := m.db.Where(asset).First(asset).Error; err != nil {
149 return nil, errors.Wrap(err, "asset not found in memory and mysql")
152 m.assetCache.Add(assetID, asset)