OSDN Git Service

add add add
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "database/sql"
6         "encoding/hex"
7         "fmt"
8         "time"
9
10         btmConsensus "github.com/bytom/consensus"
11         btmBc "github.com/bytom/protocol/bc"
12         btmTypes "github.com/bytom/protocol/bc/types"
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15
16         "github.com/vapor/errors"
17         "github.com/vapor/federation/config"
18         "github.com/vapor/federation/database"
19         "github.com/vapor/federation/database/orm"
20         "github.com/vapor/federation/service"
21         "github.com/vapor/protocol/bc"
22 )
23
24 type mainchainKeeper struct {
25         cfg        *config.Chain
26         db         *gorm.DB
27         node       *service.Node
28         chainName  string
29         assetCache *database.AssetCache
30 }
31
32 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
33         return &mainchainKeeper{
34                 cfg:        chainCfg,
35                 db:         db,
36                 node:       service.NewNode(chainCfg.Upstream),
37                 chainName:  chainCfg.Name,
38                 assetCache: database.NewAssetCache(),
39         }
40 }
41
42 func (m *mainchainKeeper) Run() {
43         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
44         for ; true; <-ticker.C {
45                 for {
46                         isUpdate, err := m.syncBlock()
47                         if err != nil {
48                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
49                                 break
50                         }
51
52                         if !isUpdate {
53                                 break
54                         }
55                 }
56         }
57 }
58
59 func (m *mainchainKeeper) syncBlock() (bool, error) {
60         chain := &orm.Chain{Name: m.chainName}
61         if err := m.db.Where(chain).First(chain).Error; err != nil {
62                 return false, errors.Wrap(err, "query chain")
63         }
64
65         height, err := m.node.GetBlockCount()
66         if err != nil {
67                 return false, err
68         }
69
70         if height <= chain.BlockHeight+m.cfg.Confirmations {
71                 return false, nil
72         }
73
74         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
75         if err != nil {
76                 return false, err
77         }
78
79         nextBlock := &btmTypes.Block{}
80         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
81                 return false, errors.New("Unmarshal nextBlock")
82         }
83
84         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
85                 log.WithFields(log.Fields{
86                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
87                         "db block_hash":            chain.BlockHash,
88                 }).Fatal("BlockHash mismatch")
89                 return false, ErrInconsistentDB
90         }
91
92         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
93                 return false, err
94         }
95
96         return true, nil
97 }
98
99 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
100         blockHash := block.Hash()
101         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
102         m.db.Begin()
103         if err := m.processBlock(chain, block, txStatus); err != nil {
104                 m.db.Rollback()
105                 return err
106         }
107
108         return m.db.Commit().Error
109 }
110
111 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus) error {
112         if err := m.processIssuing(block.Transactions); err != nil {
113                 return err
114         }
115
116         for i, tx := range block.Transactions {
117                 if m.isDepositTx(tx) {
118                         if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
119                                 return err
120                         }
121                 }
122
123                 if m.isWithdrawalTx(tx) {
124                         if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
125                                 return err
126                         }
127                 }
128         }
129
130         return m.processChainInfo(chain, block)
131 }
132
133 func (m *mainchainKeeper) isDepositTx(tx *btmTypes.Tx) bool {
134         for _, output := range tx.Outputs {
135                 if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) {
136                         return true
137                 }
138         }
139         return false
140 }
141
142 func (m *mainchainKeeper) isWithdrawalTx(tx *btmTypes.Tx) bool {
143         for _, input := range tx.Inputs {
144                 if bytes.Equal(input.ControlProgram(), fedProg) {
145                         return true
146                 }
147         }
148         return false
149 }
150
151 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *btmTypes.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *btmTypes.Tx) error {
152         blockHash := block.Hash()
153
154         var muxID btmBc.Hash
155         isMuxIDFound := false
156         for _, resOutID := range tx.ResultIds {
157                 resOut, ok := tx.Entries[*resOutID].(*btmBc.Output)
158                 if ok {
159                         muxID = *resOut.Source.Ref
160                         isMuxIDFound = true
161                         break
162                 }
163         }
164         if !isMuxIDFound {
165                 return errors.New("fail to get mux id")
166         }
167
168         rawTx, err := tx.MarshalText()
169         if err != nil {
170                 return err
171         }
172
173         ormTx := &orm.CrossTransaction{
174                 ChainID:              chain.ID,
175                 SourceBlockHeight:    block.Height,
176                 SourceBlockHash:      blockHash.String(),
177                 SourceTxIndex:        txIndex,
178                 SourceMuxID:          muxID.String(),
179                 SourceTxHash:         tx.ID.String(),
180                 SourceRawTransaction: string(rawTx),
181                 DestBlockHeight:      sql.NullInt64{Valid: false},
182                 DestBlockHash:        sql.NullString{Valid: false},
183                 DestTxIndex:          sql.NullInt64{Valid: false},
184                 DestTxHash:           sql.NullString{Valid: false},
185                 // TODO:
186                 // Status         uint8
187         }
188         if err := m.db.Create(ormTx).Error; err != nil {
189                 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
190         }
191
192         statusFail := txStatus.VerifyStatus[txIndex].StatusFail
193         crossChainInputs, err := m.getCrossChainInputs(ormTx.ID, tx, statusFail)
194         if err != nil {
195                 return err
196         }
197
198         for _, input := range crossChainInputs {
199                 if err := m.db.Create(input).Error; err != nil {
200                         return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
201                 }
202         }
203
204         return nil
205 }
206
207 func (m *mainchainKeeper) getCrossChainInputs(crossTransactionID uint64, tx *btmTypes.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
208         // assume inputs are from an identical owner
209         script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
210         inputs := []*orm.CrossTransactionReq{}
211         for i, rawOutput := range tx.Outputs {
212                 // check valid deposit
213                 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, fedProg) {
214                         continue
215                 }
216
217                 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID {
218                         continue
219                 }
220
221                 asset, err := m.getAsset(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
222                 if err != nil {
223                         return nil, err
224                 }
225
226                 input := &orm.CrossTransactionReq{
227                         CrossTransactionID: crossTransactionID,
228                         SourcePos:          uint64(i),
229                         AssetID:            asset.ID,
230                         AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
231                         Script:             script,
232                 }
233                 inputs = append(inputs, input)
234         }
235         return inputs, nil
236 }
237
238 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *btmTypes.Block, txIndex uint64, tx *btmTypes.Tx) error {
239         /*
240                 ormTx := &orm.CrossTransaction{
241                         ChainID:         chain.ID,
242                         DestBlockHeight: sql.NullInt64{Valid: false},
243                         DestBlockHash:   sql.NullString{Valid: false},
244                         DestTxIndex:     sql.NullInt64{Valid: false},
245                         DestTxHash:      sql.NullString{Valid: false},
246                         // Status         uint8
247                 }
248         */
249
250         return nil
251 }
252
253 // TODO: maybe common
254 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *btmTypes.Block) error {
255         blockHash := block.Hash()
256         chain.BlockHash = blockHash.String()
257         chain.BlockHeight = block.Height
258         res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
259         if err := res.Error; err != nil {
260                 return err
261         }
262
263         if res.RowsAffected != 1 {
264                 return ErrInconsistentDB
265         }
266
267         return nil
268 }
269
270 func (m *mainchainKeeper) processIssuing(txs []*btmTypes.Tx) error {
271         for _, tx := range txs {
272                 for _, input := range tx.Inputs {
273                         switch inp := input.TypedInput.(type) {
274                         case *btmTypes.IssuanceInput:
275                                 assetID := inp.AssetID()
276                                 if _, err := m.getAsset(assetID.String()); err == nil {
277                                         continue
278                                 }
279
280                                 asset := &orm.Asset{
281                                         AssetID:           assetID.String(),
282                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
283                                         VMVersion:         inp.VMVersion,
284                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
285                                 }
286                                 if err := m.db.Create(asset).Error; err != nil {
287                                         return err
288                                 }
289
290                                 m.assetCache.Add(asset.AssetID, asset)
291                         }
292                 }
293         }
294
295         return nil
296 }
297
298 // TODO: maybe common
299 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
300         if asset := m.assetCache.Get(assetID); asset != nil {
301                 return asset, nil
302         }
303
304         asset := &orm.Asset{AssetID: assetID}
305         if err := m.db.Where(asset).First(asset).Error; err != nil {
306                 return nil, errors.Wrap(err, "asset not found in memory and mysql")
307         }
308
309         m.assetCache.Add(assetID, asset)
310         return asset, nil
311 }