OSDN Git Service

change the ts (#284)
[bytom/vapor.git] / toolbar / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "database/sql"
6         "encoding/hex"
7         "fmt"
8         "time"
9
10         btmConsensus "github.com/bytom/consensus"
11         btmBc "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13         "github.com/bytom/protocol/vm"
14         "github.com/jinzhu/gorm"
15         log "github.com/sirupsen/logrus"
16
17         "github.com/vapor/consensus"
18         "github.com/vapor/errors"
19         "github.com/vapor/toolbar/federation/common"
20         "github.com/vapor/toolbar/federation/config"
21         "github.com/vapor/toolbar/federation/database"
22         "github.com/vapor/toolbar/federation/database/orm"
23         "github.com/vapor/toolbar/federation/service"
24         "github.com/vapor/protocol/bc"
25 )
26
27 type mainchainKeeper struct {
28         cfg            *config.Chain
29         db             *gorm.DB
30         node           *service.Node
31         assetStore     *database.AssetStore
32         chainID        uint64
33         federationProg []byte
34 }
35
36 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
37         chain := &orm.Chain{Name: common.BytomChainName}
38         if err := db.Where(chain).First(chain).Error; err != nil {
39                 log.WithField("err", err).Fatal("fail on get chain info")
40         }
41
42         return &mainchainKeeper{
43                 cfg:            &cfg.Mainchain,
44                 db:             db,
45                 node:           service.NewNode(cfg.Mainchain.Upstream),
46                 assetStore:     assetStore,
47                 federationProg: cfg.FederationProg,
48                 chainID:        chain.ID,
49         }
50 }
51
52 func (m *mainchainKeeper) Run() {
53         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
54         for ; true; <-ticker.C {
55                 for {
56                         isUpdate, err := m.syncBlock()
57                         if err != nil {
58                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
59                                 break
60                         }
61
62                         if !isUpdate {
63                                 break
64                         }
65                 }
66         }
67 }
68
69 func (m *mainchainKeeper) createCrossChainReqs(db *gorm.DB, crossTransactionID uint64, tx *types.Tx, statusFail bool) error {
70         prog := tx.Inputs[0].ControlProgram()
71         fromAddress := common.ProgToAddress(prog, &consensus.BytomMainNetParams)
72         toAddress := common.ProgToAddress(prog, &consensus.MainNetParams)
73         for i, rawOutput := range tx.Outputs {
74                 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.federationProg) {
75                         continue
76                 }
77
78                 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID {
79                         continue
80                 }
81
82                 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
83                 if err != nil {
84                         return err
85                 }
86
87                 req := &orm.CrossTransactionReq{
88                         CrossTransactionID: crossTransactionID,
89                         SourcePos:          uint64(i),
90                         AssetID:            asset.ID,
91                         AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
92                         Script:             hex.EncodeToString(prog),
93                         FromAddress:        fromAddress,
94                         ToAddress:          toAddress,
95                 }
96
97                 if err := db.Create(req).Error; err != nil {
98                         return err
99                 }
100         }
101         return nil
102 }
103
104 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
105         for _, input := range tx.Inputs {
106                 if bytes.Equal(input.ControlProgram(), m.federationProg) {
107                         return false
108                 }
109         }
110
111         for _, output := range tx.Outputs {
112                 if bytes.Equal(output.OutputCommitment.ControlProgram, m.federationProg) {
113                         return true
114                 }
115         }
116         return false
117 }
118
119 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
120         for _, input := range tx.Inputs {
121                 if !bytes.Equal(input.ControlProgram(), m.federationProg) {
122                         return false
123                 }
124         }
125
126         if sourceTxHash := locateSideChainTx(tx.Outputs[len(tx.Outputs)-1]); sourceTxHash == "" {
127                 return false
128         }
129         return false
130 }
131
132 func locateSideChainTx(output *types.TxOutput) string {
133         insts, err := vm.ParseProgram(output.OutputCommitment.ControlProgram)
134         if err != nil {
135                 return ""
136         }
137
138         if len(insts) != 2 {
139                 return ""
140         }
141
142         return hex.EncodeToString(insts[1].Data)
143 }
144
145 func (m *mainchainKeeper) processBlock(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus) error {
146         for i, tx := range block.Transactions {
147                 if err := m.processIssuance(tx); err != nil {
148                         return err
149                 }
150
151                 if m.isDepositTx(tx) {
152                         if err := m.processDepositTx(db, block, txStatus, i); err != nil {
153                                 return err
154                         }
155                 }
156
157                 if m.isWithdrawalTx(tx) {
158                         if err := m.processWithdrawalTx(db, block, i); err != nil {
159                                 return err
160                         }
161                 }
162         }
163
164         return nil
165 }
166
167 func (m *mainchainKeeper) processChainInfo(db *gorm.DB, block *types.Block) error {
168         blockHash := block.Hash()
169         res := db.Model(&orm.Chain{}).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(&orm.Chain{
170                 BlockHash:   blockHash.String(),
171                 BlockHeight: block.Height,
172         })
173         if err := res.Error; err != nil {
174                 return err
175         }
176
177         if res.RowsAffected != 1 {
178                 return ErrInconsistentDB
179         }
180         return nil
181 }
182
183 func (m *mainchainKeeper) processDepositTx(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus, txIndex int) error {
184         tx := block.Transactions[txIndex]
185         var muxID btmBc.Hash
186         switch res := tx.Entries[*tx.ResultIds[0]].(type) {
187         case *btmBc.Output:
188                 muxID = *res.Source.Ref
189         case *btmBc.Retirement:
190                 muxID = *res.Source.Ref
191         default:
192                 return ErrOutputType
193         }
194
195         rawTx, err := tx.MarshalText()
196         if err != nil {
197                 return err
198         }
199
200         blockHash := block.Hash()
201         ormTx := &orm.CrossTransaction{
202                 ChainID:              m.chainID,
203                 SourceBlockHeight:    block.Height,
204                 SourceBlockTimestamp: block.Timestamp,
205                 SourceBlockHash:      blockHash.String(),
206                 SourceTxIndex:        uint64(txIndex),
207                 SourceMuxID:          muxID.String(),
208                 SourceTxHash:         tx.ID.String(),
209                 SourceRawTransaction: string(rawTx),
210                 DestBlockHeight:      sql.NullInt64{Valid: false},
211                 DestBlockTimestamp:   sql.NullInt64{Valid: false},
212                 DestBlockHash:        sql.NullString{Valid: false},
213                 DestTxIndex:          sql.NullInt64{Valid: false},
214                 DestTxHash:           sql.NullString{Valid: false},
215                 Status:               common.CrossTxPendingStatus,
216         }
217         if err := db.Create(ormTx).Error; err != nil {
218                 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
219         }
220
221         return m.createCrossChainReqs(db, ormTx.ID, tx, txStatus.VerifyStatus[txIndex].StatusFail)
222 }
223
224 func (m *mainchainKeeper) processIssuance(tx *types.Tx) error {
225         for _, input := range tx.Inputs {
226                 if input.InputType() != types.IssuanceInputType {
227                         continue
228                 }
229
230                 issuance := input.TypedInput.(*types.IssuanceInput)
231                 assetID := issuance.AssetID()
232                 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
233                         continue
234                 }
235
236                 asset := &orm.Asset{
237                         AssetID:         assetID.String(),
238                         IssuanceProgram: hex.EncodeToString(issuance.IssuanceProgram),
239                         VMVersion:       issuance.VMVersion,
240                         Definition:      string(issuance.AssetDefinition),
241                 }
242
243                 if err := m.db.Create(asset).Error; err != nil {
244                         return err
245                 }
246         }
247         return nil
248 }
249
250 func (m *mainchainKeeper) processWithdrawalTx(db *gorm.DB, block *types.Block, txIndex int) error {
251         blockHash := block.Hash()
252         tx := block.Transactions[txIndex]
253
254         stmt := db.Model(&orm.CrossTransaction{}).Where(&orm.CrossTransaction{
255                 SourceTxHash: locateSideChainTx(tx.Outputs[len(tx.Outputs)-1]),
256                 Status:       common.CrossTxPendingStatus,
257         }).UpdateColumn(&orm.CrossTransaction{
258                 DestBlockHeight:    sql.NullInt64{int64(block.Height), true},
259                 DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true},
260                 DestBlockHash:      sql.NullString{blockHash.String(), true},
261                 DestTxIndex:        sql.NullInt64{int64(txIndex), true},
262                 DestTxHash:         sql.NullString{tx.ID.String(), true},
263                 Status:             common.CrossTxCompletedStatus,
264         })
265         if stmt.Error != nil {
266                 return stmt.Error
267         }
268
269         if stmt.RowsAffected != 1 {
270                 return ErrInconsistentDB
271         }
272         return nil
273 }
274
275 func (m *mainchainKeeper) syncBlock() (bool, error) {
276         chain := &orm.Chain{ID: m.chainID}
277         if err := m.db.First(chain).Error; err != nil {
278                 return false, errors.Wrap(err, "query chain")
279         }
280
281         height, err := m.node.GetBlockCount()
282         if err != nil {
283                 return false, err
284         }
285
286         if height <= chain.BlockHeight+m.cfg.Confirmations {
287                 return false, nil
288         }
289
290         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
291         if err != nil {
292                 return false, err
293         }
294
295         nextBlock := &types.Block{}
296         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
297                 return false, errors.New("Unmarshal nextBlock")
298         }
299
300         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
301                 log.WithFields(log.Fields{
302                         "remote previous_block_Hash": nextBlock.PreviousBlockHash.String(),
303                         "db block_hash":              chain.BlockHash,
304                 }).Fatal("fail on block hash mismatch")
305         }
306
307         if err := m.tryAttachBlock(nextBlock, txStatus); err != nil {
308                 return false, err
309         }
310
311         return true, nil
312 }
313
314 func (m *mainchainKeeper) tryAttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
315         blockHash := block.Hash()
316         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
317
318         dbTx := m.db.Begin()
319         if err := m.processBlock(dbTx, block, txStatus); err != nil {
320                 dbTx.Rollback()
321                 return err
322         }
323
324         if err := m.processChainInfo(dbTx, block); err != nil {
325                 dbTx.Rollback()
326                 return err
327         }
328
329         return dbTx.Commit().Error
330 }