OSDN Git Service

0c1612c0644d0dc092e978fbacc8160e23f23bcc
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "database/sql"
6         "encoding/hex"
7         "fmt"
8         "time"
9
10         btmConsensus "github.com/bytom/consensus"
11         btmBc "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15
16         vaporConsensus "github.com/vapor/consensus"
17         "github.com/vapor/consensus/segwit"
18         "github.com/vapor/errors"
19         "github.com/vapor/federation/common"
20         "github.com/vapor/federation/config"
21         "github.com/vapor/federation/database"
22         "github.com/vapor/federation/database/orm"
23         "github.com/vapor/federation/service"
24         "github.com/vapor/federation/util"
25         "github.com/vapor/protocol/bc"
26         "github.com/vapor/wallet"
27 )
28
29 type mainchainKeeper struct {
30         cfg        *config.Chain
31         db         *gorm.DB
32         node       *service.Node
33         chainName  string
34         assetStore *database.AssetStore
35         fedProg    []byte
36 }
37
38 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
39         return &mainchainKeeper{
40                 cfg:        &cfg.Mainchain,
41                 db:         db,
42                 node:       service.NewNode(cfg.Mainchain.Upstream),
43                 chainName:  cfg.Mainchain.Name,
44                 assetStore: assetStore,
45                 fedProg:    util.ParseFedProg(cfg.Warders, cfg.Quorum),
46         }
47 }
48
49 func (m *mainchainKeeper) Run() {
50         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
51         for ; true; <-ticker.C {
52                 for {
53                         isUpdate, err := m.syncBlock()
54                         if err != nil {
55                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
56                                 break
57                         }
58
59                         if !isUpdate {
60                                 break
61                         }
62                 }
63         }
64 }
65
66 func (m *mainchainKeeper) syncBlock() (bool, error) {
67         chain := &orm.Chain{Name: m.chainName}
68         if err := m.db.Where(chain).First(chain).Error; err != nil {
69                 return false, errors.Wrap(err, "query chain")
70         }
71
72         height, err := m.node.GetBlockCount()
73         if err != nil {
74                 return false, err
75         }
76
77         if height <= chain.BlockHeight+m.cfg.Confirmations {
78                 return false, nil
79         }
80
81         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
82         if err != nil {
83                 return false, err
84         }
85
86         nextBlock := &types.Block{}
87         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
88                 return false, errors.New("Unmarshal nextBlock")
89         }
90
91         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
92                 log.WithFields(log.Fields{
93                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
94                         "db block_hash":            chain.BlockHash,
95                 }).Fatal("BlockHash mismatch")
96                 return false, ErrInconsistentDB
97         }
98
99         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
100                 return false, err
101         }
102
103         return true, nil
104 }
105
106 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
107         blockHash := block.Hash()
108         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
109         dbTx := m.db.Begin()
110         if err := m.processBlock(chain, block, txStatus); err != nil {
111                 dbTx.Rollback()
112                 return err
113         }
114
115         return dbTx.Commit().Error
116 }
117
118 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
119         if err := m.processIssuing(block.Transactions); err != nil {
120                 return err
121         }
122
123         for i, tx := range block.Transactions {
124                 if m.isDepositTx(tx) {
125                         if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
126                                 return err
127                         }
128                 }
129
130                 if m.isWithdrawalTx(tx) {
131                         if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
132                                 return err
133                         }
134                 }
135         }
136
137         return m.processChainInfo(chain, block)
138 }
139
140 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
141         for _, output := range tx.Outputs {
142                 if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) {
143                         return true
144                 }
145         }
146         return false
147 }
148
149 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
150         for _, input := range tx.Inputs {
151                 if bytes.Equal(input.ControlProgram(), m.fedProg) {
152                         return true
153                 }
154         }
155         return false
156 }
157
158 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
159         blockHash := block.Hash()
160
161         var muxID btmBc.Hash
162         res0ID := tx.ResultIds[0]
163         switch res := tx.Entries[*res0ID].(type) {
164         case *btmBc.Output:
165                 muxID = *res.Source.Ref
166         case *btmBc.Retirement:
167                 muxID = *res.Source.Ref
168         default:
169                 return ErrOutputType
170         }
171
172         rawTx, err := tx.MarshalText()
173         if err != nil {
174                 return err
175         }
176
177         ormTx := &orm.CrossTransaction{
178                 ChainID:              chain.ID,
179                 SourceBlockHeight:    block.Height,
180                 SourceBlockTimestamp: block.Timestamp,
181                 SourceBlockHash:      blockHash.String(),
182                 SourceTxIndex:        txIndex,
183                 SourceMuxID:          muxID.String(),
184                 SourceTxHash:         tx.ID.String(),
185                 SourceRawTransaction: string(rawTx),
186                 DestBlockHeight:      sql.NullInt64{Valid: false},
187                 DestBlockTimestamp:   sql.NullInt64{Valid: false},
188                 DestBlockHash:        sql.NullString{Valid: false},
189                 DestTxIndex:          sql.NullInt64{Valid: false},
190                 DestTxHash:           sql.NullString{Valid: false},
191                 Status:               common.CrossTxPendingStatus,
192         }
193         if err := m.db.Create(ormTx).Error; err != nil {
194                 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
195         }
196
197         statusFail := txStatus.VerifyStatus[txIndex].StatusFail
198         crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
199         if err != nil {
200                 return err
201         }
202
203         for _, input := range crossChainInputs {
204                 if err := m.db.Create(input).Error; err != nil {
205                         return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
206                 }
207         }
208
209         return nil
210 }
211
212 func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
213         var fromAddress, toAddress string
214         // assume inputs are from an identical owner
215         prog := tx.Inputs[0].ControlProgram()
216         script := hex.EncodeToString(prog)
217         switch {
218         case segwit.IsP2WPKHScript(prog):
219                 if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
220                         fromAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.MainNetParams)
221                         toAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.VaporNetParams)
222                 }
223         case segwit.IsP2WSHScript(prog):
224                 if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
225                         fromAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.MainNetParams)
226                         toAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.VaporNetParams)
227                 }
228         }
229
230         reqs := []*orm.CrossTransactionReq{}
231         for i, rawOutput := range tx.Outputs {
232                 // check valid deposit
233                 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) {
234                         continue
235                 }
236
237                 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID {
238                         continue
239                 }
240
241                 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
242                 if err != nil {
243                         return nil, err
244                 }
245
246                 req := &orm.CrossTransactionReq{
247                         CrossTransactionID: crossTransactionID,
248                         SourcePos:          uint64(i),
249                         AssetID:            asset.ID,
250                         AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
251                         Script:             script,
252                         FromAddress:        fromAddress,
253                         ToAddress:          toAddress,
254                 }
255                 reqs = append(reqs, req)
256         }
257         return reqs, nil
258 }
259
260 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
261         blockHash := block.Hash()
262         stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
263                 Where(&orm.CrossTransaction{
264                         DestTxHash: sql.NullString{tx.ID.String(), true},
265                         Status:     common.CrossTxPendingStatus,
266                 }).UpdateColumn(&orm.CrossTransaction{
267                 DestBlockHeight:    sql.NullInt64{int64(block.Height), true},
268                 DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true},
269                 DestBlockHash:      sql.NullString{blockHash.String(), true},
270                 DestTxIndex:        sql.NullInt64{int64(txIndex), true},
271                 Status:             common.CrossTxCompletedStatus,
272         })
273         if stmt.Error != nil {
274                 return stmt.Error
275         }
276
277         if stmt.RowsAffected != 1 {
278                 log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
279         }
280         return nil
281 }
282
283 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
284         blockHash := block.Hash()
285         chain.BlockHash = blockHash.String()
286         chain.BlockHeight = block.Height
287         res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
288         if err := res.Error; err != nil {
289                 return err
290         }
291
292         if res.RowsAffected != 1 {
293                 return ErrInconsistentDB
294         }
295
296         return nil
297 }
298
299 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
300         for _, tx := range txs {
301                 for _, input := range tx.Inputs {
302                         switch inp := input.TypedInput.(type) {
303                         case *types.IssuanceInput:
304                                 assetID := inp.AssetID()
305                                 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
306                                         continue
307                                 }
308
309                                 m.assetStore.Add(&orm.Asset{
310                                         AssetID:           assetID.String(),
311                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
312                                         VMVersion:         inp.VMVersion,
313                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
314                                 })
315                         }
316                 }
317         }
318
319         return nil
320 }