OSDN Git Service

fix
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "encoding/hex"
6         "time"
7
8         btmTypes "github.com/bytom/protocol/bc/types"
9         "github.com/jinzhu/gorm"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/errors"
13         "github.com/vapor/federation/config"
14         "github.com/vapor/federation/database"
15         "github.com/vapor/federation/database/orm"
16         "github.com/vapor/federation/service"
17         "github.com/vapor/protocol/bc"
18 )
19
20 type mainchainKeeper struct {
21         cfg        *config.Chain
22         db         *gorm.DB
23         node       *service.Node
24         chainName  string
25         assetCache *database.AssetCache
26 }
27
28 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
29         return &mainchainKeeper{
30                 cfg:        chainCfg,
31                 db:         db,
32                 node:       service.NewNode(chainCfg.Upstream),
33                 chainName:  chainCfg.Name,
34                 assetCache: database.NewAssetCache(),
35         }
36 }
37
38 func (m *mainchainKeeper) Run() {
39         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
40         for ; true; <-ticker.C {
41                 for {
42                         isUpdate, err := m.syncBlock()
43                         if err != nil {
44                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
45                                 break
46                         }
47
48                         if !isUpdate {
49                                 break
50                         }
51                 }
52         }
53 }
54
55 func (m *mainchainKeeper) syncBlock() (bool, error) {
56         chain := &orm.Chain{Name: m.chainName}
57         if err := m.db.Where(chain).First(chain).Error; err != nil {
58                 return false, errors.Wrap(err, "query chain")
59         }
60
61         height, err := m.node.GetBlockCount()
62         if err != nil {
63                 return false, err
64         }
65
66         if height <= chain.BlockHeight+m.cfg.Confirmations {
67                 return false, nil
68         }
69
70         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
71         if err != nil {
72                 return false, err
73         }
74
75         nextBlock := &btmTypes.Block{}
76         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
77                 return false, errors.New("Unmarshal nextBlock")
78         }
79
80         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
81                 log.WithFields(log.Fields{
82                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
83                         "db block_hash":            chain.BlockHash,
84                 }).Fatal("BlockHash mismatch")
85                 return false, ErrInconsistentDB
86         }
87
88         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
89                 return false, err
90         }
91
92         return true, nil
93 }
94
95 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
96         blockHash := block.Hash()
97         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
98         m.db.Begin()
99         if err := m.processBlock(chain, block); err != nil {
100                 m.db.Rollback()
101                 return err
102         }
103
104         return m.db.Commit().Error
105 }
106
107 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *btmTypes.Block) error {
108         if err := m.processIssuing(block.Transactions); err != nil {
109                 return err
110         }
111
112         for i, tx := range block.Transactions {
113                 if m.isDepositTx(tx) {
114                         if err := m.processDepositTx(uint64(i), tx); err != nil {
115                                 return err
116                         }
117                 }
118
119                 if m.isWithdrawalTx(tx) {
120                         if err := m.processWithdrawalTx(uint64(i), tx); err != nil {
121                                 return err
122                         }
123                 }
124         }
125
126         return m.processChainInfo(chain, block)
127 }
128
129 func (m *mainchainKeeper) isDepositTx(tx *btmTypes.Tx) bool {
130         for _, output := range tx.Outputs {
131                 if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) {
132                         return true
133                 }
134         }
135         return false
136 }
137
138 func (m *mainchainKeeper) isWithdrawalTx(tx *btmTypes.Tx) bool {
139         for _, input := range tx.Inputs {
140                 if bytes.Equal(input.ControlProgram(), fedProg) {
141                         return true
142                 }
143         }
144         return false
145 }
146
147 func (m *mainchainKeeper) processDepositTx(txIndex uint64, tx *btmTypes.Tx) error {
148         return nil
149 }
150
151 func (m *mainchainKeeper) processWithdrawalTx(txIndex uint64, tx *btmTypes.Tx) error {
152         return nil
153 }
154
155 // TODO: maybe common
156 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *btmTypes.Block) error {
157         blockHash := block.Hash()
158         chain.BlockHash = blockHash.String()
159         chain.BlockHeight = block.Height
160         res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
161         if err := res.Error; err != nil {
162                 return err
163         }
164
165         if res.RowsAffected != 1 {
166                 return ErrInconsistentDB
167         }
168
169         return nil
170 }
171
172 func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
173         for _, tx := range txs {
174                 for _, input := range tx.Inputs {
175                         switch inp := input.TypedInput.(type) {
176                         case *btmTypes.IssuanceInput:
177                                 assetID := inp.AssetID()
178                                 if _, err := m.getAsset(assetID.String()); err == nil {
179                                         continue
180                                 }
181
182                                 asset := &orm.Asset{
183                                         AssetID:           assetID.String(),
184                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
185                                         VMVersion:         inp.VMVersion,
186                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
187                                 }
188                                 if err := m.db.Create(asset).Error; err != nil {
189                                         return err
190                                 }
191
192                                 m.assetCache.Add(asset.AssetID, asset)
193                         }
194                 }
195         }
196
197         return nil
198 }
199
200 // TODO: maybe common
201 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
202         if asset := m.assetCache.Get(assetID); asset != nil {
203                 return asset, nil
204         }
205
206         asset := &orm.Asset{AssetID: assetID}
207         if err := m.db.Where(asset).First(asset).Error; err != nil {
208                 return nil, errors.Wrap(err, "asset not found in memory and mysql")
209         }
210
211         m.assetCache.Add(assetID, asset)
212         return asset, nil
213 }