OSDN Git Service

filter change output (#248)
[bytom/vapor.git] / federation / synchron / mainchain_keeper.go
1 package synchron
2
3 import (
4         "bytes"
5         "database/sql"
6         "encoding/hex"
7         "fmt"
8         "time"
9
10         btmConsensus "github.com/bytom/consensus"
11         btmBc "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15
16         vaporConsensus "github.com/vapor/consensus"
17         "github.com/vapor/consensus/segwit"
18         "github.com/vapor/errors"
19         "github.com/vapor/federation/common"
20         "github.com/vapor/federation/config"
21         "github.com/vapor/federation/database"
22         "github.com/vapor/federation/database/orm"
23         "github.com/vapor/federation/service"
24         "github.com/vapor/federation/util"
25         "github.com/vapor/protocol/bc"
26         "github.com/vapor/wallet"
27 )
28
29 type mainchainKeeper struct {
30         cfg        *config.Chain
31         db         *gorm.DB
32         node       *service.Node
33         chainName  string
34         assetStore *database.AssetStore
35         fedProg    []byte
36 }
37
38 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
39         return &mainchainKeeper{
40                 cfg:        &cfg.Mainchain,
41                 db:         db,
42                 node:       service.NewNode(cfg.Mainchain.Upstream),
43                 chainName:  cfg.Mainchain.Name,
44                 assetStore: assetStore,
45                 fedProg:    util.ParseFedProg(cfg.Warders, cfg.Quorum),
46         }
47 }
48
49 func (m *mainchainKeeper) Run() {
50         ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
51         for ; true; <-ticker.C {
52                 for {
53                         isUpdate, err := m.syncBlock()
54                         if err != nil {
55                                 log.WithField("error", err).Errorln("blockKeeper fail on process block")
56                                 break
57                         }
58
59                         if !isUpdate {
60                                 break
61                         }
62                 }
63         }
64 }
65
66 func (m *mainchainKeeper) syncBlock() (bool, error) {
67         chain := &orm.Chain{Name: m.chainName}
68         if err := m.db.Where(chain).First(chain).Error; err != nil {
69                 return false, errors.Wrap(err, "query chain")
70         }
71
72         height, err := m.node.GetBlockCount()
73         if err != nil {
74                 return false, err
75         }
76
77         if height <= chain.BlockHeight+m.cfg.Confirmations {
78                 return false, nil
79         }
80
81         nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
82         if err != nil {
83                 return false, err
84         }
85
86         nextBlock := &types.Block{}
87         if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
88                 return false, errors.New("Unmarshal nextBlock")
89         }
90
91         if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
92                 log.WithFields(log.Fields{
93                         "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
94                         "db block_hash":            chain.BlockHash,
95                 }).Fatal("BlockHash mismatch")
96                 return false, ErrInconsistentDB
97         }
98
99         if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
100                 return false, err
101         }
102
103         return true, nil
104 }
105
106 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
107         blockHash := block.Hash()
108         log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
109         dbTx := m.db.Begin()
110         if err := m.processBlock(chain, block, txStatus); err != nil {
111                 dbTx.Rollback()
112                 return err
113         }
114
115         return dbTx.Commit().Error
116 }
117
118 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
119         if err := m.processIssuing(block.Transactions); err != nil {
120                 return err
121         }
122
123         for i, tx := range block.Transactions {
124                 if m.isDepositTx(tx) {
125                         if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
126                                 return err
127                         }
128                 }
129
130                 if m.isWithdrawalTx(tx) {
131                         if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
132                                 return err
133                         }
134                 }
135         }
136
137         return m.processChainInfo(chain, block)
138 }
139
140 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
141         for _, input := range tx.Inputs {
142                 if bytes.Equal(input.ControlProgram(), m.fedProg) {
143                         return false
144                 }
145         }
146
147         for _, output := range tx.Outputs {
148                 if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) {
149                         return true
150                 }
151         }
152
153         return false
154 }
155
156 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
157         for _, input := range tx.Inputs {
158                 if bytes.Equal(input.ControlProgram(), m.fedProg) {
159                         return true
160                 }
161         }
162         return false
163 }
164
165 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
166         blockHash := block.Hash()
167
168         var muxID btmBc.Hash
169         res0ID := tx.ResultIds[0]
170         switch res := tx.Entries[*res0ID].(type) {
171         case *btmBc.Output:
172                 muxID = *res.Source.Ref
173         case *btmBc.Retirement:
174                 muxID = *res.Source.Ref
175         default:
176                 return ErrOutputType
177         }
178
179         rawTx, err := tx.MarshalText()
180         if err != nil {
181                 return err
182         }
183
184         ormTx := &orm.CrossTransaction{
185                 ChainID:              chain.ID,
186                 SourceBlockHeight:    block.Height,
187                 SourceBlockTimestamp: block.Timestamp,
188                 SourceBlockHash:      blockHash.String(),
189                 SourceTxIndex:        txIndex,
190                 SourceMuxID:          muxID.String(),
191                 SourceTxHash:         tx.ID.String(),
192                 SourceRawTransaction: string(rawTx),
193                 DestBlockHeight:      sql.NullInt64{Valid: false},
194                 DestBlockTimestamp:   sql.NullInt64{Valid: false},
195                 DestBlockHash:        sql.NullString{Valid: false},
196                 DestTxIndex:          sql.NullInt64{Valid: false},
197                 DestTxHash:           sql.NullString{Valid: false},
198                 Status:               common.CrossTxPendingStatus,
199         }
200         if err := m.db.Create(ormTx).Error; err != nil {
201                 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
202         }
203
204         statusFail := txStatus.VerifyStatus[txIndex].StatusFail
205         crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
206         if err != nil {
207                 return err
208         }
209
210         for _, input := range crossChainInputs {
211                 if err := m.db.Create(input).Error; err != nil {
212                         return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
213                 }
214         }
215
216         return nil
217 }
218
219 func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
220         var fromAddress, toAddress string
221         // assume inputs are from an identical owner
222         prog := tx.Inputs[0].ControlProgram()
223         script := hex.EncodeToString(prog)
224         switch {
225         case segwit.IsP2WPKHScript(prog):
226                 if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
227                         fromAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.MainNetParams)
228                         toAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.VaporNetParams)
229                 }
230         case segwit.IsP2WSHScript(prog):
231                 if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
232                         fromAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.MainNetParams)
233                         toAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.VaporNetParams)
234                 }
235         }
236
237         reqs := []*orm.CrossTransactionReq{}
238         for i, rawOutput := range tx.Outputs {
239                 // check valid deposit
240                 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) {
241                         continue
242                 }
243
244                 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID {
245                         continue
246                 }
247
248                 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
249                 if err != nil {
250                         return nil, err
251                 }
252
253                 req := &orm.CrossTransactionReq{
254                         CrossTransactionID: crossTransactionID,
255                         SourcePos:          uint64(i),
256                         AssetID:            asset.ID,
257                         AssetAmount:        rawOutput.OutputCommitment.AssetAmount.Amount,
258                         Script:             script,
259                         FromAddress:        fromAddress,
260                         ToAddress:          toAddress,
261                 }
262                 reqs = append(reqs, req)
263         }
264         return reqs, nil
265 }
266
267 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
268         blockHash := block.Hash()
269         stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
270                 Where(&orm.CrossTransaction{
271                         DestTxHash: sql.NullString{tx.ID.String(), true},
272                         Status:     common.CrossTxPendingStatus,
273                 }).UpdateColumn(&orm.CrossTransaction{
274                 DestBlockHeight:    sql.NullInt64{int64(block.Height), true},
275                 DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true},
276                 DestBlockHash:      sql.NullString{blockHash.String(), true},
277                 DestTxIndex:        sql.NullInt64{int64(txIndex), true},
278                 Status:             common.CrossTxCompletedStatus,
279         })
280         if stmt.Error != nil {
281                 return stmt.Error
282         }
283
284         if stmt.RowsAffected != 1 {
285                 log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
286         }
287         return nil
288 }
289
290 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
291         blockHash := block.Hash()
292         chain.BlockHash = blockHash.String()
293         chain.BlockHeight = block.Height
294         res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
295         if err := res.Error; err != nil {
296                 return err
297         }
298
299         if res.RowsAffected != 1 {
300                 return ErrInconsistentDB
301         }
302
303         return nil
304 }
305
306 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
307         for _, tx := range txs {
308                 for _, input := range tx.Inputs {
309                         switch inp := input.TypedInput.(type) {
310                         case *types.IssuanceInput:
311                                 assetID := inp.AssetID()
312                                 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
313                                         continue
314                                 }
315
316                                 m.assetStore.Add(&orm.Asset{
317                                         AssetID:           assetID.String(),
318                                         IssuanceProgram:   hex.EncodeToString(inp.IssuanceProgram),
319                                         VMVersion:         inp.VMVersion,
320                                         RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
321                                 })
322                         }
323                 }
324         }
325
326         return nil
327 }