10 "github.com/bytom/consensus"
11 btmBc "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
13 "github.com/jinzhu/gorm"
14 log "github.com/sirupsen/logrus"
16 "github.com/vapor/errors"
17 "github.com/vapor/federation/common"
18 "github.com/vapor/federation/config"
19 "github.com/vapor/federation/database"
20 "github.com/vapor/federation/database/orm"
21 "github.com/vapor/federation/service"
22 "github.com/vapor/protocol/bc"
25 type mainchainKeeper struct {
30 assetStore *database.AssetStore
34 func NewMainchainKeeper(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *mainchainKeeper {
35 return &mainchainKeeper{
38 node: service.NewNode(cfg.Mainchain.Upstream),
39 chainName: cfg.Mainchain.Name,
40 assetStore: assetStore,
41 fedProg: config.ParseFedProg(cfg.Warders, cfg.Quorum),
45 func (m *mainchainKeeper) Run() {
46 ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
47 for ; true; <-ticker.C {
49 isUpdate, err := m.syncBlock()
51 log.WithField("error", err).Errorln("blockKeeper fail on process block")
62 func (m *mainchainKeeper) syncBlock() (bool, error) {
63 chain := &orm.Chain{Name: m.chainName}
64 if err := m.db.Where(chain).First(chain).Error; err != nil {
65 return false, errors.Wrap(err, "query chain")
68 height, err := m.node.GetBlockCount()
73 if height <= chain.BlockHeight+m.cfg.Confirmations {
77 nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
82 nextBlock := &types.Block{}
83 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
84 return false, errors.New("Unmarshal nextBlock")
87 if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
88 log.WithFields(log.Fields{
89 "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
90 "db block_hash": chain.BlockHash,
91 }).Fatal("BlockHash mismatch")
92 return false, ErrInconsistentDB
95 if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
102 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
103 blockHash := block.Hash()
104 log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
106 if err := m.processBlock(chain, block, txStatus); err != nil {
111 return m.db.Commit().Error
114 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
115 if err := m.processIssuing(block.Transactions); err != nil {
119 for i, tx := range block.Transactions {
120 if m.isDepositTx(tx) {
121 if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
126 if m.isWithdrawalTx(tx) {
127 if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
133 return m.processChainInfo(chain, block)
136 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
137 for _, output := range tx.Outputs {
138 if bytes.Equal(output.OutputCommitment.ControlProgram, m.fedProg) {
145 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
146 for _, input := range tx.Inputs {
147 if bytes.Equal(input.ControlProgram(), m.fedProg) {
154 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
155 blockHash := block.Hash()
158 res0ID := tx.ResultIds[0]
159 switch res := tx.Entries[*res0ID].(type) {
161 muxID = *res.Source.Ref
162 case *btmBc.Retirement:
163 muxID = *res.Source.Ref
168 rawTx, err := tx.MarshalText()
173 ormTx := &orm.CrossTransaction{
175 SourceBlockHeight: block.Height,
176 SourceBlockHash: blockHash.String(),
177 SourceTxIndex: txIndex,
178 SourceMuxID: muxID.String(),
179 SourceTxHash: tx.ID.String(),
180 SourceRawTransaction: string(rawTx),
181 DestBlockHeight: sql.NullInt64{Valid: false},
182 DestBlockHash: sql.NullString{Valid: false},
183 DestTxIndex: sql.NullInt64{Valid: false},
184 DestTxHash: sql.NullString{Valid: false},
185 Status: common.CrossTxPendingStatus,
187 if err := m.db.Create(ormTx).Error; err != nil {
188 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
191 statusFail := txStatus.VerifyStatus[txIndex].StatusFail
192 crossChainInputs, err := m.getCrossChainReqs(ormTx.ID, tx, statusFail)
198 return m.db.Create(crossChainInputs).Error
201 func (m *mainchainKeeper) getCrossChainReqs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
202 // assume inputs are from an identical owner
203 script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
204 reqs := []*orm.CrossTransactionReq{}
205 for i, rawOutput := range tx.Outputs {
206 // check valid deposit
207 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, m.fedProg) {
211 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *consensus.BTMAssetID {
215 asset, err := m.assetStore.GetByAssetID(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
220 req := &orm.CrossTransactionReq{
221 CrossTransactionID: crossTransactionID,
222 SourcePos: uint64(i),
224 AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount,
227 reqs = append(reqs, req)
232 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
233 blockHash := block.Hash()
234 stmt := m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
235 Where(&orm.CrossTransaction{
236 DestTxHash: sql.NullString{tx.ID.String(), true},
237 Status: common.CrossTxSubmittedStatus,
238 }).UpdateColumn(&orm.CrossTransaction{
239 DestBlockHeight: sql.NullInt64{int64(block.Height), true},
240 DestBlockHash: sql.NullString{blockHash.String(), true},
241 DestTxIndex: sql.NullInt64{int64(txIndex), true},
242 Status: common.CrossTxCompletedStatus,
244 if stmt.Error != nil {
248 if stmt.RowsAffected != 1 {
249 log.Warnf("mainchainKeeper.processWithdrawalTx(%v): rows affected != 1", tx.ID.String())
254 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
255 blockHash := block.Hash()
256 chain.BlockHash = blockHash.String()
257 chain.BlockHeight = block.Height
258 res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
259 if err := res.Error; err != nil {
263 if res.RowsAffected != 1 {
264 return ErrInconsistentDB
270 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
271 for _, tx := range txs {
272 for _, input := range tx.Inputs {
273 switch inp := input.TypedInput.(type) {
274 case *types.IssuanceInput:
275 assetID := inp.AssetID()
276 if _, err := m.assetStore.GetByAssetID(assetID.String()); err == nil {
280 m.assetStore.Add(&orm.Asset{
281 AssetID: assetID.String(),
282 IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram),
283 VMVersion: inp.VMVersion,
284 RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),