7 btmTypes "github.com/bytom/protocol/bc/types"
8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/errors"
12 "github.com/vapor/federation/common"
13 "github.com/vapor/federation/config"
14 "github.com/vapor/federation/database/orm"
15 "github.com/vapor/federation/service"
16 vaporTypes "github.com/vapor/protocol/bc/types"
20 colletInterval time.Duration
22 txCh chan *orm.CrossTransaction
23 mainchainNode *service.Node
24 sidechainNode *service.Node
25 others []*service.Warder
28 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
30 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
33 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
34 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
35 others: parseOtherWarders(cfg),
39 func parseOtherWarders(cfg *config.Config) []*service.Warder {
40 var others []*service.Warder
41 for _, warderCfg := range cfg.Warders {
42 if !warderCfg.IsLocal {
43 anotherWarder := service.NewWarder(&warderCfg)
44 others = append(others, anotherWarder)
50 func (w *warder) Run() {
51 go w.collectPendingTx()
52 go w.processCrossTxRoutine()
55 func (w *warder) collectPendingTx() {
56 ticker := time.NewTicker(w.colletInterval)
57 for ; true; <-ticker.C {
58 txs := []*orm.CrossTransaction{}
59 if err := w.db.Preload("Chain").Preload("Reqs").
60 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly
61 // otherwise the field "status" is ignored
62 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
63 Find(&txs).Error; err == gorm.ErrRecordNotFound {
65 } else if err != nil {
66 log.Warnln("collectPendingTx", err)
69 for _, tx := range txs {
75 func (w *warder) processCrossTxRoutine() {
76 for ormTx := range w.txCh {
77 if err := w.validateCrossTx(ormTx); err != nil {
78 log.Warnln("invalid cross-chain tx", ormTx)
82 destTx, destTxID, err := w.proposeDestTx(ormTx)
84 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
88 if err := w.signDestTx(destTx, ormTx); err != nil {
89 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
93 // TODO: elect signer & request sign
95 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
96 submittedTxID, err := w.submitTx(destTx)
98 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
102 if submittedTxID != destTxID {
103 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "built tx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
108 // TODO: what to update? what about others?
109 if err := w.updateSubmission(ormTx); err != nil {
110 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
117 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
119 case common.CrossTxRejectedStatus:
120 return errors.New("cross-chain tx rejected")
121 case common.CrossTxSubmittedStatus:
122 return errors.New("cross-chain tx submitted")
123 case common.CrossTxCompletedStatus:
124 return errors.New("cross-chain tx completed")
127 crossTxReqs := []*orm.CrossTransactionReq{}
128 if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
132 if len(crossTxReqs) != len(tx.Reqs) {
133 return errors.New("cross-chain requests count mismatch")
139 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
140 switch tx.Chain.Name {
142 return w.buildSidechainTx(tx)
144 return w.buildMainchainTx(tx)
146 return nil, "", errors.New("unknown source chain")
151 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
152 sidechainTx := &vaporTypes.Tx{}
154 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
155 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
156 }).Error; err != nil {
160 return sidechainTx, sidechainTx.ID.String(), nil
164 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
165 mainchainTx := &btmTypes.Tx{}
167 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
168 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
169 }).Error; err != nil {
173 return mainchainTx, mainchainTx.ID.String(), nil
177 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
178 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
179 return errors.New("cross-chain tx status error")
185 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
189 func (w *warder) isLeader() bool {
193 func (w *warder) submitTx(destTx interface{}) (string, error) {
194 switch tx := destTx.(type) {
196 return w.mainchainNode.SubmitTx(tx)
198 return w.sidechainNode.SubmitTx(tx)
200 return "", errors.New("unknown destTx type")
205 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {