OSDN Git Service

rename GetByAssetID
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "database/sql"
6         "encoding/hex"
7         "fmt"
8         "time"
9
10         "github.com/bytom/consensus"
11         btmBc "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15
16         "github.com/vapor/errors"
17         "github.com/vapor/federation/common"
18         "github.com/vapor/federation/config"
19         "github.com/vapor/federation/database"
20         "github.com/vapor/federation/database/orm"
21         "github.com/vapor/federation/service"
22         "github.com/vapor/protocol/bc"
23 )
24
25 type mainchainKeeper struct {
26         cfg        *config.Chain
27         db         *gorm.DB
28         node       *service.Node
29         chainName  string
30         assetStore *database.AssetStore
31         fedProg    []byte
32 }
33
34 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
35         return &mainchainKeeper{
36                 cfg:        &cfg.Mainchain,
37                 db:         db,
38                 node:       service.NewNode(cfg.Mainchain.Upstream),
39                 chainName:  cfg.Mainchain.Name,
40                 assetStore: assetStore,
41                 fedProg:    config.ParseFedProg(cfg.Warders, cfg.Quorum),
42         }
43 }
44
45 func (m *mainchainKeeper) Run() {
46         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
47         for ; true; <-ticker.C {
48                 for {
49                         isUpdate, err := m.syncBlock()
50                         if err != nil {
51                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
52                                 break
53                         }
54
55                         if !isUpdate {
56                                 break
57                         }
58                 }
59         }
60 }
61
62 func (m *mainchainKeeper) syncBlock() (bool, error) {
63         chain := &orm.Chain{Name: m.chainName}
64         if err := m.db.Where(chain).First(chain).Error; err != nil {
65                 return false, errors.Wrap(err, "query chain")
66         }
67
68         height, err := m.node.GetBlockCount()
69         if err != nil {
70                 return false, err
71         }
72
73         if height <= chain.BlockHeight+m.cfg.Confirmations {
74                 return false, nil
75         }
76
77         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
78         if err != nil {
79                 return false, err
80         }
81
82         nextBlock := &types.Block{}
83         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
84                 return false, errors.New("Unmarshal nextBlock")
85         }
86
87         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
88                 log.WithFields(log.Fields{
89                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
90                         "db block_hash":            chain.BlockHash,
91                 }).Fatal("BlockHash mismatch")
92                 return false, ErrInconsistentDB
93         }
94
95         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
96                 return false, err
97         }
98
99         return true, nil
100 }
101
102 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
103         blockHash := block.Hash()
104         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
105         m.db.Begin()
106         if err := m.processBlock(chain, block, txStatus); err != nil {
107                 m.db.Rollback()
108                 return err
109         }
110
111         return m.db.Commit().Error
112 }
113
114 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
115         if err := m.processIssuing(block.Transactions); err != nil {
116                 return err
117         }
118
119         for i, tx := range block.Transactions {
120                 if m.isDepositTx(tx) {
121                         if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
122                                 return err
123                         }
124                 }
125
126                 if m.isWithdrawalTx(tx) {
127                         if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
128                                 return err
129                         }
130                 }
131         }
132
133         return m.processChainInfo(chain, block)
134 }
135
136 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
137         for _, output := range tx.Outputs {
138                 if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) {
139                         return true
140                 }
141         }
142         return false
143 }
144
145 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
146         for _, input := range tx.Inputs {
147                 if bytes.Equal(input.ControlProgram(), m.fedProg) {
148                         return true
149                 }
150         }
151         return false
152 }
153
154 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
155         blockHash := block.Hash()
156
157         var muxID btmBc.Hash
158         res0ID := tx.ResultIds[0]
159         switch res := tx.Entries[*res0ID].(type) {
160         case *btmBc.Output:
161                 muxID = *res.Source.Ref
162         case *btmBc.Retirement:
163                 muxID = *res.Source.Ref
164         default:
165                 return ErrOutputType
166         }
167
168         rawTx, err := tx.MarshalText()
169         if err != nil {
170                 return err
171         }
172
173         ormTx := &orm.CrossTransaction{
174                 ChainID:              chain.ID,
175                 SourceBlockHeight:    block.Height,
176                 SourceBlockHash:      blockHash.String(),
177                 SourceTxIndex:        txIndex,
178                 SourceMuxID:          muxID.String(),
179                 SourceTxHash:         tx.ID.String(),
180                 SourceRawTransaction: string(rawTx),
181                 DestBlockHeight:      sql.NullInt64{Valid: false},
182                 DestBlockHash:        sql.NullString{Valid: false},
183                 DestTxIndex:          sql.NullInt64{Valid: false},
184                 DestTxHash:           sql.NullString{Valid: false},
185                 Status:               common.CrossTxPendingStatus,
186         }
187         if err := m.db.Create(ormTx).Error; err != nil {
188                 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
189         }
190
191         statusFail := txStatus.VerifyStatus[txIndex].StatusFail
192         crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
193         if err != nil {
194                 return err
195         }
196
197         // batch insert
198         return m.db.Create(crossChainInputs).Error
199 }
200
201 func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
202         // assume inputs are from an identical owner
203         script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
204         reqs := []*orm.CrossTransactionReq{}
205         for i, rawOutput := range tx.Outputs {
206                 // check valid deposit
207                 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) {
208                         continue
209                 }
210
211                 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *consensus.BTMAssetID {
212                         continue
213                 }
214
215                 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
216                 if err != nil {
217                         return nil, err
218                 }
219
220                 req := &orm.CrossTransactionReq{
221                         CrossTransactionID: crossTransactionID,
222                         SourcePos:          uint64(i),
223                         AssetID:            asset.ID,
224                         AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
225                         Script:             script,
226                 }
227                 reqs = append(reqs, req)
228         }
229         return reqs, nil
230 }
231
232 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
233         blockHash := block.Hash()
234         stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
235                 Where(&orm.CrossTransaction{
236                         DestTxHash: sql.NullString{tx.ID.String(), true},
237                         Status:     common.CrossTxSubmittedStatus,
238                 }).UpdateColumn(&orm.CrossTransaction{
239                 DestBlockHeight: sql.NullInt64{int64(block.Height), true},
240                 DestBlockHash:   sql.NullString{blockHash.String(), true},
241                 DestTxIndex:     sql.NullInt64{int64(txIndex), true},
242                 Status:          common.CrossTxCompletedStatus,
243         })
244         if stmt.Error != nil {
245                 return stmt.Error
246         }
247
248         if stmt.RowsAffected != 1 {
249                 log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
250         }
251         return nil
252 }
253
254 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
255         blockHash := block.Hash()
256         chain.BlockHash = blockHash.String()
257         chain.BlockHeight = block.Height
258         res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
259         if err := res.Error; err != nil {
260                 return err
261         }
262
263         if res.RowsAffected != 1 {
264                 return ErrInconsistentDB
265         }
266
267         return nil
268 }
269
270 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
271         for _, tx := range txs {
272                 for _, input := range tx.Inputs {
273                         switch inp := input.TypedInput.(type) {
274                         case *types.IssuanceInput:
275                                 assetID := inp.AssetID()
276                                 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
277                                         continue
278                                 }
279
280                                 m.assetStore.Add(&orm.Asset{
281                                         AssetID:           assetID.String(),
282                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
283                                         VMVersion:         inp.VMVersion,
284                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
285                                 })
286                         }
287                 }
288         }
289
290         return nil
291 }