7 btmTypes "github.com/bytom/protocol/bc/types"
8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/errors"
12 "github.com/vapor/federation/common"
13 "github.com/vapor/federation/config"
14 "github.com/vapor/federation/database/orm"
15 "github.com/vapor/federation/service"
16 vaporTypes "github.com/vapor/protocol/bc/types"
20 colletInterval time.Duration
22 txCh chan *orm.CrossTransaction
23 mainchainNode *service.Node
24 sidechainNode *service.Node
27 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
29 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
32 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
33 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
37 func (w *warder) Run() {
38 go w.collectPendingTx()
40 for ormTx := range w.txCh {
41 if err := w.validateCrossTx(ormTx); err != nil {
42 log.Warnln("invalid cross-chain tx", ormTx)
46 destTx, destTxID, err := w.proposeDestTx(ormTx)
48 log.WithFields(log.Fields{
50 "cross-chain tx": ormTx,
51 }).Warnln("proposeDestTx")
55 if err := w.signDestTx(destTx, ormTx); err != nil {
56 log.WithFields(log.Fields{
58 "cross-chain tx": ormTx,
59 }).Warnln("signDestTx")
63 // TODO: elect signer & request sign
65 // TODO: what if submit fail
66 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
67 submittedTxID, err := w.submitTx(destTx)
69 log.WithFields(log.Fields{
71 "cross-chain tx": ormTx,
77 if submittedTxID != destTxID {
78 log.WithFields(log.Fields{
80 "cross-chain tx": ormTx,
81 "built tx ID": destTxID,
82 "submittedTx ID": submittedTxID,
83 }).Warnln("submitTx ID mismatch")
88 // TODO: what to update? what about others?
89 if err := w.updateSubmission(ormTx); err != nil {
90 log.WithFields(log.Fields{
92 "cross-chain tx": ormTx,
93 }).Warnln("updateSubmission")
100 func (w *warder) collectPendingTx() {
101 ticker := time.NewTicker(w.colletInterval)
102 for ; true; <-ticker.C {
103 txs := []*orm.CrossTransaction{}
104 if err := w.db.Preload("Chain").Preload("Reqs").
105 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly
106 // otherwise the field "status" is ignored
107 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
108 Find(&txs).Error; err == gorm.ErrRecordNotFound {
110 } else if err != nil {
111 log.Warnln("collectPendingTx", err)
114 for _, tx := range txs {
120 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
121 if tx.Status == common.CrossTxRejectedStatus {
122 return errors.New("cross-chain tx rejeted")
125 if tx.Status == common.CrossTxRejectedStatus {
126 return errors.New("cross-chain tx submitted")
129 crossTxReqs := []*orm.CrossTransactionReq{}
130 if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
134 if len(crossTxReqs) != len(tx.Reqs) {
135 return errors.New("cross-chain requests count mismatch")
141 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
142 switch tx.Chain.Name {
144 return w.buildSidechainTx(tx)
146 return w.buildMainchainTx(tx)
148 return nil, "", errors.New("unknown source chain")
153 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (interface{}, string, error) {
154 sidechainTx := &vaporTypes.Tx{}
156 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
157 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
158 }).Error; err != nil {
162 return sidechainTx, sidechainTx.ID.String(), nil
166 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (interface{}, string, error) {
167 mainchainTx := &btmTypes.Tx{}
169 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
170 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
171 }).Error; err != nil {
175 return mainchainTx, mainchainTx.ID.String(), nil
179 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
180 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
181 return errors.New("cross-chain tx status error")
188 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
193 func (w *warder) isLeader() bool {
197 func (w *warder) submitTx(destTx interface{}) (string, error) {
198 switch tx := destTx.(type) {
200 return w.mainchainNode.SubmitTx(tx)
203 return w.sidechainNode.SubmitTx(tx)
206 return "", errors.New("unknown destTx type")
211 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {