10 btmConsensus "github.com/bytom/consensus"
11 btmBc "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
13 "github.com/jinzhu/gorm"
14 log "github.com/sirupsen/logrus"
16 vaporConsensus "github.com/vapor/consensus"
17 "github.com/vapor/consensus/segwit"
18 "github.com/vapor/errors"
19 "github.com/vapor/federation/common"
20 "github.com/vapor/federation/config"
21 "github.com/vapor/federation/database"
22 "github.com/vapor/federation/database/orm"
23 "github.com/vapor/federation/service"
24 "github.com/vapor/federation/util"
25 "github.com/vapor/protocol/bc"
26 "github.com/vapor/wallet"
29 type mainchainKeeper struct {
34 assetStore *database.AssetStore
38 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
39 return &mainchainKeeper{
42 node: service.NewNode(cfg.Mainchain.Upstream),
43 chainName: cfg.Mainchain.Name,
44 assetStore: assetStore,
45 fedProg: util.ParseFedProg(cfg.Warders, cfg.Quorum),
49 func (m *mainchainKeeper) Run() {
50 ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
51 for ; true; <-ticker.C {
53 isUpdate, err := m.syncBlock()
55 log.WithField("error", err).Errorln("blockKeeper fail on process block")
66 func (m *mainchainKeeper) syncBlock() (bool, error) {
67 chain := &orm.Chain{Name: m.chainName}
68 if err := m.db.Where(chain).First(chain).Error; err != nil {
69 return false, errors.Wrap(err, "query chain")
72 height, err := m.node.GetBlockCount()
77 if height <= chain.BlockHeight+m.cfg.Confirmations {
81 nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
86 nextBlock := &types.Block{}
87 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
88 return false, errors.New("Unmarshal nextBlock")
91 if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
92 log.WithFields(log.Fields{
93 "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
94 "db block_hash": chain.BlockHash,
95 }).Fatal("BlockHash mismatch")
96 return false, ErrInconsistentDB
99 if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
106 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
107 blockHash := block.Hash()
108 log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
110 if err := m.processBlock(chain, block, txStatus); err != nil {
115 return dbTx.Commit().Error
118 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
119 if err := m.processIssuing(block.Transactions); err != nil {
123 for i, tx := range block.Transactions {
124 if m.isDepositTx(tx) {
125 if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
130 if m.isWithdrawalTx(tx) {
131 if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
137 return m.processChainInfo(chain, block)
140 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
141 for _, input := range tx.Inputs {
142 if bytes.Equal(input.ControlProgram(), m.fedProg) {
147 for _, output := range tx.Outputs {
148 if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) {
156 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
157 for _, input := range tx.Inputs {
158 if bytes.Equal(input.ControlProgram(), m.fedProg) {
165 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
166 blockHash := block.Hash()
169 res0ID := tx.ResultIds[0]
170 switch res := tx.Entries[*res0ID].(type) {
172 muxID = *res.Source.Ref
173 case *btmBc.Retirement:
174 muxID = *res.Source.Ref
179 rawTx, err := tx.MarshalText()
184 ormTx := &orm.CrossTransaction{
186 SourceBlockHeight: block.Height,
187 SourceBlockTimestamp: block.Timestamp,
188 SourceBlockHash: blockHash.String(),
189 SourceTxIndex: txIndex,
190 SourceMuxID: muxID.String(),
191 SourceTxHash: tx.ID.String(),
192 SourceRawTransaction: string(rawTx),
193 DestBlockHeight: sql.NullInt64{Valid: false},
194 DestBlockTimestamp: sql.NullInt64{Valid: false},
195 DestBlockHash: sql.NullString{Valid: false},
196 DestTxIndex: sql.NullInt64{Valid: false},
197 DestTxHash: sql.NullString{Valid: false},
198 Status: common.CrossTxPendingStatus,
200 if err := m.db.Create(ormTx).Error; err != nil {
201 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
204 statusFail := txStatus.VerifyStatus[txIndex].StatusFail
205 crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
210 for _, input := range crossChainInputs {
211 if err := m.db.Create(input).Error; err != nil {
212 return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
219 func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
220 var fromAddress, toAddress string
221 // assume inputs are from an identical owner
222 prog := tx.Inputs[0].ControlProgram()
223 script := hex.EncodeToString(prog)
225 case segwit.IsP2WPKHScript(prog):
226 if pubHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
227 fromAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.BytomMainNetParams)
228 toAddress = wallet.BuildP2PKHAddress(pubHash, &vaporConsensus.MainNetParams)
230 case segwit.IsP2WSHScript(prog):
231 if scriptHash, err := segwit.GetHashFromStandardProg(prog); err == nil {
232 fromAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.BytomMainNetParams)
233 toAddress = wallet.BuildP2SHAddress(scriptHash, &vaporConsensus.MainNetParams)
237 reqs := []*orm.CrossTransactionReq{}
238 for i, rawOutput := range tx.Outputs {
239 // check valid deposit
240 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) {
244 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *btmConsensus.BTMAssetID {
248 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
253 req := &orm.CrossTransactionReq{
254 CrossTransactionID: crossTransactionID,
255 SourcePos: uint64(i),
257 AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount,
259 FromAddress: fromAddress,
260 ToAddress: toAddress,
262 reqs = append(reqs, req)
267 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
268 blockHash := block.Hash()
269 stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
270 Where(&orm.CrossTransaction{
271 DestTxHash: sql.NullString{tx.ID.String(), true},
272 Status: common.CrossTxPendingStatus,
273 }).UpdateColumn(&orm.CrossTransaction{
274 DestBlockHeight: sql.NullInt64{int64(block.Height), true},
275 DestBlockTimestamp: sql.NullInt64{int64(block.Timestamp), true},
276 DestBlockHash: sql.NullString{blockHash.String(), true},
277 DestTxIndex: sql.NullInt64{int64(txIndex), true},
278 Status: common.CrossTxCompletedStatus,
280 if stmt.Error != nil {
284 if stmt.RowsAffected != 1 {
285 log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
290 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
291 blockHash := block.Hash()
292 chain.BlockHash = blockHash.String()
293 chain.BlockHeight = block.Height
294 res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
295 if err := res.Error; err != nil {
299 if res.RowsAffected != 1 {
300 return ErrInconsistentDB
306 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
307 for _, tx := range txs {
308 for _, input := range tx.Inputs {
309 switch inp := input.TypedInput.(type) {
310 case *types.IssuanceInput:
311 assetID := inp.AssetID()
312 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
316 m.assetStore.Add(&orm.Asset{
317 AssetID: assetID.String(),
318 IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram),
319 VMVersion: inp.VMVersion,
320 Definition: string(inp.AssetDefinition),