10 "github.com/bytom/consensus"
11 btmBc "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
13 "github.com/jinzhu/gorm"
14 log "github.com/sirupsen/logrus"
16 "github.com/vapor/errors"
17 "github.com/vapor/federation/common"
18 "github.com/vapor/federation/config"
19 "github.com/vapor/federation/database"
20 "github.com/vapor/federation/database/orm"
21 "github.com/vapor/federation/service"
22 "github.com/vapor/protocol/bc"
25 type mainchainKeeper struct {
30 assetCache *database.AssetCache
33 func NewMainchainKeeper(db *gorm.DB, chainCfg *config.Chain) *mainchainKeeper {
34 return &mainchainKeeper{
37 node: service.NewNode(chainCfg.Upstream),
38 chainName: chainCfg.Name,
39 assetCache: database.NewAssetCache(),
43 func (m *mainchainKeeper) Run() {
44 ticker := time.NewTicker(time.Duration(m.cfg.SyncSeconds) * time.Second)
45 for ; true; <-ticker.C {
47 isUpdate, err := m.syncBlock()
49 log.WithField("error", err).Errorln("blockKeeper fail on process block")
60 func (m *mainchainKeeper) syncBlock() (bool, error) {
61 chain := &orm.Chain{Name: m.chainName}
62 if err := m.db.Where(chain).First(chain).Error; err != nil {
63 return false, errors.Wrap(err, "query chain")
66 height, err := m.node.GetBlockCount()
71 if height <= chain.BlockHeight+m.cfg.Confirmations {
75 nextBlockStr, txStatus, err := m.node.GetBlockByHeight(chain.BlockHeight + 1)
80 nextBlock := &types.Block{}
81 if err := nextBlock.UnmarshalText([]byte(nextBlockStr)); err != nil {
82 return false, errors.New("Unmarshal nextBlock")
85 if nextBlock.PreviousBlockHash.String() != chain.BlockHash {
86 log.WithFields(log.Fields{
87 "remote PreviousBlockHash": nextBlock.PreviousBlockHash.String(),
88 "db block_hash": chain.BlockHash,
89 }).Fatal("BlockHash mismatch")
90 return false, ErrInconsistentDB
93 if err := m.tryAttachBlock(chain, nextBlock, txStatus); err != nil {
100 func (m *mainchainKeeper) tryAttachBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
101 blockHash := block.Hash()
102 log.WithFields(log.Fields{"block_height": block.Height, "block_hash": blockHash.String()}).Info("start to attachBlock")
104 if err := m.processBlock(chain, block, txStatus); err != nil {
109 return m.db.Commit().Error
112 func (m *mainchainKeeper) processBlock(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus) error {
113 if err := m.processIssuing(block.Transactions); err != nil {
117 for i, tx := range block.Transactions {
118 if m.isDepositTx(tx) {
119 if err := m.processDepositTx(chain, block, txStatus, uint64(i), tx); err != nil {
124 if m.isWithdrawalTx(tx) {
125 if err := m.processWithdrawalTx(chain, block, uint64(i), tx); err != nil {
131 return m.processChainInfo(chain, block)
134 func (m *mainchainKeeper) isDepositTx(tx *types.Tx) bool {
135 for _, output := range tx.Outputs {
136 if bytes.Equal(output.OutputCommitment.ControlProgram, fedProg) {
143 func (m *mainchainKeeper) isWithdrawalTx(tx *types.Tx) bool {
144 for _, input := range tx.Inputs {
145 if bytes.Equal(input.ControlProgram(), fedProg) {
152 func (m *mainchainKeeper) processDepositTx(chain *orm.Chain, block *types.Block, txStatus *bc.TransactionStatus, txIndex uint64, tx *types.Tx) error {
153 blockHash := block.Hash()
156 isMuxIDFound := false
157 for _, resOutID := range tx.ResultIds {
158 resOut, ok := tx.Entries[*resOutID].(*btmBc.Output)
160 muxID = *resOut.Source.Ref
166 return errors.New("fail to get mux id")
169 rawTx, err := tx.MarshalText()
174 ormTx := &orm.CrossTransaction{
176 SourceBlockHeight: block.Height,
177 SourceBlockHash: blockHash.String(),
178 SourceTxIndex: txIndex,
179 SourceMuxID: muxID.String(),
180 SourceTxHash: tx.ID.String(),
181 SourceRawTransaction: string(rawTx),
182 DestBlockHeight: sql.NullInt64{Valid: false},
183 DestBlockHash: sql.NullString{Valid: false},
184 DestTxIndex: sql.NullInt64{Valid: false},
185 DestTxHash: sql.NullString{Valid: false},
186 Status: common.CrossTxPendingStatus,
188 if err := m.db.Create(ormTx).Error; err != nil {
189 return errors.Wrap(err, fmt.Sprintf("create mainchain DepositTx %s", tx.ID.String()))
192 statusFail := txStatus.VerifyStatus[txIndex].StatusFail
193 crossChainInputs, err := m.getCrossChainInputs(ormTx.ID, tx, statusFail)
198 for _, input := range crossChainInputs {
199 if err := m.db.Create(input).Error; err != nil {
200 return errors.Wrap(err, fmt.Sprintf("create DepositFromMainchain input: txid(%s), pos(%d)", tx.ID.String(), input.SourcePos))
207 func (m *mainchainKeeper) getCrossChainInputs(crossTransactionID uint64, tx *types.Tx, statusFail bool) ([]*orm.CrossTransactionReq, error) {
208 // assume inputs are from an identical owner
209 script := hex.EncodeToString(tx.Inputs[0].ControlProgram())
210 inputs := []*orm.CrossTransactionReq{}
211 for i, rawOutput := range tx.Outputs {
212 // check valid deposit
213 if !bytes.Equal(rawOutput.OutputCommitment.ControlProgram, fedProg) {
217 if statusFail && *rawOutput.OutputCommitment.AssetAmount.AssetId != *consensus.BTMAssetID {
221 asset, err := m.getAsset(rawOutput.OutputCommitment.AssetAmount.AssetId.String())
226 input := &orm.CrossTransactionReq{
227 CrossTransactionID: crossTransactionID,
228 SourcePos: uint64(i),
230 AssetAmount: rawOutput.OutputCommitment.AssetAmount.Amount,
233 inputs = append(inputs, input)
238 func (m *mainchainKeeper) processWithdrawalTx(chain *orm.Chain, block *types.Block, txIndex uint64, tx *types.Tx) error {
239 blockHash := block.Hash()
240 return m.db.Model(&orm.CrossTransaction{}).Where("chain_id != ?", chain.ID).
241 Where(&orm.CrossTransaction{
242 DestTxHash: sql.NullString{tx.ID.String(), true},
243 Status: common.CrossTxSubmittedStatus,
244 }).UpdateColumn(&orm.CrossTransaction{
245 DestBlockHeight: sql.NullInt64{int64(block.Height), true},
246 DestBlockHash: sql.NullString{blockHash.String(), true},
247 DestTxIndex: sql.NullInt64{int64(txIndex), true},
248 Status: common.CrossTxCompletedStatus,
252 // TODO: maybe common
253 func (m *mainchainKeeper) processChainInfo(chain *orm.Chain, block *types.Block) error {
254 blockHash := block.Hash()
255 chain.BlockHash = blockHash.String()
256 chain.BlockHeight = block.Height
257 res := m.db.Model(chain).Where("block_hash = ?", block.PreviousBlockHash.String()).Updates(chain)
258 if err := res.Error; err != nil {
262 if res.RowsAffected != 1 {
263 return ErrInconsistentDB
269 func (m *mainchainKeeper) processIssuing(txs []*types.Tx) error {
270 for _, tx := range txs {
271 for _, input := range tx.Inputs {
272 switch inp := input.TypedInput.(type) {
273 case *types.IssuanceInput:
274 assetID := inp.AssetID()
275 if _, err := m.getAsset(assetID.String()); err == nil {
280 AssetID: assetID.String(),
281 IssuanceProgram: hex.EncodeToString(inp.IssuanceProgram),
282 VMVersion: inp.VMVersion,
283 RawDefinitionByte: hex.EncodeToString(inp.AssetDefinition),
285 if err := m.db.Create(asset).Error; err != nil {
289 m.assetCache.Add(asset.AssetID, asset)
297 // TODO: maybe common
298 func (m *mainchainKeeper) getAsset(assetID string) (*orm.Asset, error) {
299 if asset := m.assetCache.Get(assetID); asset != nil {
303 asset := &orm.Asset{AssetID: assetID}
304 if err := m.db.Where(asset).First(asset).Error; err != nil {
305 return nil, errors.Wrap(err, "asset not found in memory and mysql")
308 m.assetCache.Add(assetID, asset)