OSDN Git Service

feat: add processIssuing (#152)
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "encoding/hex"
5         "time"
6
7         btmTypes "github.com/bytom/protocol/bc/types"
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/errors"
12         "github.com/vapor/federation/config"
13         "github.com/vapor/federation/database"
14         "github.com/vapor/federation/database/orm"
15         "github.com/vapor/federation/service"
16         "github.com/vapor/protocol/bc"
17 )
18
19 type mainchainKeeper struct {
20         cfg        *config.Chain
21         db         *gorm.DB
22         node       *service.Node
23         chainName  string
24         assetCache *database.AssetCache
25 }
26
27 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
28         return &mainchainKeeper{
29                 cfg:        chainCfg,
30                 db:         db,
31                 node:       service.NewNode(chainCfg.Upstream),
32                 chainName:  chainCfg.Name,
33                 assetCache: database.NewAssetCache(),
34         }
35 }
36
37 func (m *mainchainKeeper) Run() {
38         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
39         for ; true; <-ticker.C {
40                 for {
41                         isUpdate, err := m.syncBlock()
42                         if err != nil {
43                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
44                                 break
45                         }
46
47                         if !isUpdate {
48                                 break
49                         }
50                 }
51         }
52 }
53
54 func (m *mainchainKeeper) syncBlock() (bool, error) {
55         chain := &orm.Chain{Name: m.chainName}
56         if err := m.db.Where(chain).First(chain).Error; err != nil {
57                 return false, errors.Wrap(err, "query chain")
58         }
59
60         height, err := m.node.GetBlockCount()
61         if err != nil {
62                 return false, err
63         }
64
65         if height <= chain.BlockHeight+m.cfg.Confirmations {
66                 return false, nil
67         }
68
69         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
70         if err != nil {
71                 return false, err
72         }
73
74         nextBlock := &btmTypes.Block{}
75         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
76                 return false, errors.New("Unmarshal nextBlock")
77         }
78
79         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
80                 log.WithFields(log.Fields{
81                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
82                         "db block_hash":            chain.BlockHash,
83                 }).Fatal("BlockHash mismatch")
84                 return false, errors.New("BlockHash mismatch")
85         }
86
87         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
88                 return false, err
89         }
90
91         return true, nil
92 }
93
94 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
95         blockHash := block.Hash()
96         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
97         m.db.Begin()
98         if err := m.processBlock(block); err != nil {
99                 m.db.Rollback()
100                 return err
101         }
102
103         return m.db.Commit().Error
104 }
105
106 func (m *mainchainKeeper) processBlock(block *btmTypes.Block) error {
107         if err := m.processIssuing(block.Transactions); err != nil {
108                 return err
109         }
110
111         return nil
112 }
113
114 func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
115         for _, tx := range txs {
116                 for _, input := range tx.Inputs {
117                         switch inp := input.TypedInput.(type) {
118                         case *btmTypes.IssuanceInput:
119                                 assetID := inp.AssetID()
120                                 if _, err := m.getAsset(assetID.String()); err == nil {
121                                         continue
122                                 }
123
124                                 asset := &orm.Asset{
125                                         AssetID:           assetID.String(),
126                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
127                                         VMVersion:         inp.VMVersion,
128                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
129                                 }
130                                 if err := m.db.Create(asset).Error; err != nil {
131                                         return err
132                                 }
133
134                                 m.assetCache.Add(asset.AssetID, asset)
135                         }
136                 }
137         }
138
139         return nil
140 }
141
142 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
143         if asset := m.assetCache.Get(assetID); asset != nil {
144                 return asset, nil
145         }
146
147         asset := &orm.Asset{AssetID: assetID}
148         if err := m.db.Where(asset).First(asset).Error; err != nil {
149                 return nil, errors.Wrap(err, "asset not found in memory and mysql")
150         }
151
152         m.assetCache.Add(assetID, asset)
153         return asset, nil
154 }