OSDN Git Service

Mov (#518)
[bytom/vapor.git] / toolbar / federation / synchron / sidechain_keeper.go
1 package synchron
2
3 import (
4         "database/sql"
5         "encoding/hex"
6         "fmt"
7         "time"
8
9         "github.com/jinzhu/gorm"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/bytom/vapor/consensus"
13         "github.com/bytom/vapor/errors"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16         "github.com/bytom/vapor/toolbar/federation/common"
17         "github.com/bytom/vapor/toolbar/federation/config"
18         "github.com/bytom/vapor/toolbar/federation/database"
19         "github.com/bytom/vapor/toolbar/federation/database/orm"
20         "github.com/bytom/vapor/toolbar/federation/service"
21 )
22
23 type sidechainKeeper struct {
24         cfg        *config.Chain
25         db         *gorm.DB
26         node       *service.Node
27         assetStore *database.AssetStore
28         chainID    uint64
29         netParams  consensus.Params
30 }
31
32 func NewSidechainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *sidechainKeeper {
33         chain := &orm.Chain{Name: common.VaporChainName}
34         if err := db.Where(chain).First(chain).Error; err != nil {
35                 log.WithField("err", err).Fatal("fail on get chain info")
36         }
37
38         return &sidechainKeeper{
39                 cfg:        &cfg.Sidechain,
40                 db:         db,
41                 node:       service.NewNode(cfg.Sidechain.Upstream),
42                 assetStore: assetStore,
43                 chainID:    chain.ID,
44                 netParams:  consensus.NetParams[cfg.Network],
45         }
46 }
47
48 func (s *sidechainKeeper) Run() {
49         ticker := time.NewTicker(time.Duration(s.cfg.SyncSeconds) * time.Second)
50         for ; true; <-ticker.C {
51                 for {
52                         isUpdate, err := s.syncBlock()
53                         if err != nil {
54                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
55                                 break
56                         }
57
58                         if !isUpdate {
59                                 break
60                         }
61                 }
62         }
63 }
64
65 func (s *sidechainKeeper) createCrossChainReqs(db *gorm.DB, crossTransactionID uint64, tx *types.Tx, statusFail bool) error {
66         fromAddress := common.ProgToAddress(tx.Inputs[0].ControlProgram(), &s.netParams)
67         for i, rawOutput := range tx.Outputs {
68                 if rawOutput.OutputType() != types.CrossChainOutputType {
69                         continue
70                 }
71
72                 if statusFail && *rawOutput.OutputCommitment().AssetAmount.AssetId != *consensus.BTMAssetID {
73                         continue
74                 }
75
76                 asset, err := s.assetStore.GetByAssetID(rawOutput.OutputCommitment().AssetAmount.AssetId.String())
77                 if err != nil {
78                         return err
79                 }
80
81                 if asset.IsOpenFederationIssue {
82                         continue
83                 }
84
85                 prog := rawOutput.ControlProgram()
86                 req := &orm.CrossTransactionReq{
87                         CrossTransactionID: crossTransactionID,
88                         SourcePos:          uint64(i),
89                         AssetID:            asset.ID,
90                         AssetAmount:        rawOutput.OutputCommitment().AssetAmount.Amount,
91                         Script:             hex.EncodeToString(prog),
92                         FromAddress:        fromAddress,
93                         ToAddress:          common.ProgToAddress(prog, consensus.BytomMainNetParams(&s.netParams)),
94                 }
95
96                 if err := db.Create(req).Error; err != nil {
97                         return err
98                 }
99         }
100         return nil
101 }
102
103 func (s *sidechainKeeper) isDepositTx(tx *types.Tx) (bool, error) {
104         for _, input := range tx.Inputs {
105                 if input.InputType() != types.CrossChainInputType {
106                         continue
107                 }
108
109                 if isOFAsset, err := s.isOpenFederationAsset(input.AssetAmount().AssetId); err != nil {
110                         return false, err
111                 } else if !isOFAsset {
112                         return true, nil
113                 }
114         }
115         return false, nil
116 }
117
118 func (s *sidechainKeeper) isWithdrawalTx(tx *types.Tx) (bool, error) {
119         for _, output := range tx.Outputs {
120                 if output.OutputType() != types.CrossChainOutputType {
121                         continue
122                 }
123
124                 if isOFAsset, err := s.isOpenFederationAsset(output.AssetAmount().AssetId); err != nil {
125                         return false, err
126                 } else if !isOFAsset {
127                         return true, nil
128                 }
129         }
130         return false, nil
131 }
132
133 func (s *sidechainKeeper) processBlock(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus) error {
134         for i, tx := range block.Transactions {
135                 if isDeposit, err := s.isDepositTx(tx); err != nil {
136                         return err
137                 } else if isDeposit {
138                         if err := s.processDepositTx(db, block, i); err != nil {
139                                 return err
140                         }
141                 }
142
143                 if isWithdrawal, err := s.isWithdrawalTx(tx); err != nil {
144                         return err
145                 } else if isWithdrawal {
146                         if err := s.processWithdrawalTx(db, block, txStatus, i); err != nil {
147                                 return err
148                         }
149                 }
150         }
151         return nil
152 }
153
154 func (s *sidechainKeeper) processChainInfo(db *gorm.DB, block *types.Block) error {
155         blockHash := block.Hash()
156         res := db.Model(&orm.Chain{}).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(&orm.Chain{
157                 BlockHash:   blockHash.String(),
158                 BlockHeight: block.Height,
159         })
160         if err := res.Error; err != nil {
161                 return err
162         }
163
164         if res.RowsAffected != 1 {
165                 return ErrInconsistentDB
166         }
167
168         return nil
169 }
170
171 func (s *sidechainKeeper) processDepositTx(db *gorm.DB, block *types.Block, txIndex int) error {
172         tx := block.Transactions[txIndex]
173         sourceTxHash, err := s.locateMainChainTx(tx.Inputs[0])
174         if err != nil {
175                 return err
176         }
177
178         blockHash := block.Hash()
179         stmt := db.Model(&orm.CrossTransaction{}).Where(&orm.CrossTransaction{
180                 SourceTxHash: sourceTxHash,
181                 Status:       common.CrossTxPendingStatus,
182         }).UpdateColumn(&orm.CrossTransaction{
183                 DestBlockHeight:    sql.NullInt64{int64(block.Height), true},
184                 DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp / 1000), true},
185                 DestBlockHash:      sql.NullString{blockHash.String(), true},
186                 DestTxIndex:        sql.NullInt64{int64(txIndex), true},
187                 DestTxHash:         sql.NullString{tx.ID.String(), true},
188                 Status:             common.CrossTxCompletedStatus,
189         })
190         if stmt.Error != nil {
191                 return stmt.Error
192         }
193
194         if stmt.RowsAffected != 1 {
195                 return errors.Wrap(ErrInconsistentDB, "fail on find deposit data on database")
196         }
197         return nil
198 }
199
200 func (s *sidechainKeeper) isOpenFederationAsset(assetID *bc.AssetID) (bool, error) {
201         asset, err := s.assetStore.GetByAssetID(assetID.String())
202         if err != nil {
203                 return false, err
204         }
205
206         return asset.IsOpenFederationIssue, nil
207 }
208
209 func (s *sidechainKeeper) processWithdrawalTx(db *gorm.DB, block *types.Block, txStatus *bc.TransactionStatus, txIndex int) error {
210         tx := block.Transactions[txIndex]
211         var muxID bc.Hash
212         switch res := tx.Entries[*tx.ResultIds[0]].(type) {
213         case *bc.CrossChainOutput:
214                 muxID = *res.Source.Ref
215         case *bc.IntraChainOutput:
216                 muxID = *res.Source.Ref
217         case *bc.VoteOutput:
218                 muxID = *res.Source.Ref
219         case *bc.Retirement:
220                 muxID = *res.Source.Ref
221         default:
222                 return ErrOutputType
223         }
224
225         rawTx, err := tx.MarshalText()
226         if err != nil {
227                 return err
228         }
229
230         blockHash := block.Hash()
231         ormTx := &orm.CrossTransaction{
232                 ChainID:              s.chainID,
233                 SourceBlockHeight:    block.Height,
234                 SourceBlockTimestamp: block.Timestamp / 1000,
235                 SourceBlockHash:      blockHash.String(),
236                 SourceTxIndex:        uint64(txIndex),
237                 SourceMuxID:          muxID.String(),
238                 SourceTxHash:         tx.ID.String(),
239                 SourceRawTransaction: string(rawTx),
240                 DestBlockHeight:      sql.NullInt64{Valid: false},
241                 DestBlockTimestamp:   sql.NullInt64{Valid: false},
242                 DestBlockHash:        sql.NullString{Valid: false},
243                 DestTxIndex:          sql.NullInt64{Valid: false},
244                 DestTxHash:           sql.NullString{Valid: false},
245                 Status:               common.CrossTxPendingStatus,
246         }
247         if err := db.Create(ormTx).Error; err != nil {
248                 return errors.Wrap(err, fmt.Sprintf("create sidechain WithdrawalTx %s", tx.ID.String()))
249         }
250
251         return s.createCrossChainReqs(db, ormTx.ID, tx, txStatus.VerifyStatus[txIndex].StatusFail)
252 }
253
254 func (s *sidechainKeeper) locateMainChainTx(input *types.TxInput) (string, error) {
255         if input.InputType() != types.CrossChainInputType {
256                 return "", errors.New("found weird crossChain tx")
257         }
258
259         crossIn := input.TypedInput.(*types.CrossChainInput)
260         crossTx := &orm.CrossTransaction{SourceMuxID: crossIn.SpendCommitment.SourceID.String()}
261         if err := s.db.Where(crossTx).First(crossTx).Error; err != nil {
262                 return "", errors.Wrap(err, "fail on find CrossTransaction")
263         }
264         return crossTx.SourceTxHash, nil
265 }
266
267 func (s *sidechainKeeper) syncBlock() (bool, error) {
268         chain := &orm.Chain{ID: s.chainID}
269         if err := s.db.First(chain).Error; err != nil {
270                 return false, errors.Wrap(err, "query chain")
271         }
272
273         height, err := s.node.GetBlockCount()
274         if err != nil {
275                 return false, err
276         }
277
278         if height <= chain.BlockHeight+s.cfg.Confirmations {
279                 return false, nil
280         }
281
282         nextBlockStr, txStatus, err := s.node.GetBlockByHeight(chain.BlockHeight + 1)
283         if err != nil {
284                 return false, err
285         }
286
287         nextBlock := &types.Block{}
288         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
289                 return false, errors.New("Unmarshal nextBlock")
290         }
291
292         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
293                 log.WithFields(log.Fields{
294                         "remote previous_block_Hash": nextBlock.PreviousBlockHash.String(),
295                         "db block_hash":              chain.BlockHash,
296                 }).Fatal("fail on block hash mismatch")
297         }
298
299         if err := s.tryAttachBlock(nextBlock, txStatus); err != nil {
300                 return false, err
301         }
302
303         return true, nil
304 }
305
306 func (s *sidechainKeeper) tryAttachBlock(block *types.Block, txStatus *bc.TransactionStatus) error {
307         blockHash := block.Hash()
308         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
309
310         dbTx := s.db.Begin()
311         if err := s.processBlock(dbTx, block, txStatus); err != nil {
312                 dbTx.Rollback()
313                 return err
314         }
315
316         if err := s.processChainInfo(dbTx, block); err != nil {
317                 dbTx.Rollback()
318                 return err
319         }
320         return dbTx.Commit().Error
321 }