7 btmTypes "github.com/bytom/protocol/bc/types"
8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/errors"
13 "github.com/vapor/federation/common"
14 "github.com/vapor/federation/config"
15 "github.com/vapor/federation/database/orm"
16 "github.com/vapor/federation/service"
17 vaporTypes "github.com/vapor/protocol/bc/types"
23 colletInterval time.Duration
25 txCh chan *orm.CrossTransaction
26 mainchainNode *service.Node
27 sidechainNode *service.Node
28 remotes []*service.Warder
31 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
32 local, remotes := parseWarders(cfg)
34 position: local.Position,
36 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
39 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
40 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
45 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
46 var local *service.Warder
47 var remotes []*service.Warder
48 for _, warderCfg := range cfg.Warders {
49 if warderCfg.IsLocal {
50 local = service.NewWarder(&warderCfg)
52 remoteWarder := service.NewWarder(&warderCfg)
53 remotes = append(remotes, remoteWarder)
58 log.Fatal("none local warder set")
64 func (w *warder) Run() {
65 go w.collectPendingTx()
66 go w.processCrossTxRoutine()
69 func (w *warder) collectPendingTx() {
70 ticker := time.NewTicker(w.colletInterval)
71 for ; true; <-ticker.C {
72 txs := []*orm.CrossTransaction{}
73 if err := w.db.Preload("Chain").Preload("Reqs").
74 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
75 // otherwise the field "status" will be ignored
76 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
77 Find(&txs).Error; err == gorm.ErrRecordNotFound {
79 } else if err != nil {
80 log.Warnln("collectPendingTx", err)
83 for _, tx := range txs {
89 func (w *warder) processCrossTxRoutine() {
90 for ormTx := range w.txCh {
91 if err := w.validateCrossTx(ormTx); err != nil {
92 log.Warnln("invalid cross-chain tx", ormTx)
96 destTx, destTxID, err := w.proposeDestTx(ormTx)
98 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
102 if err := w.signDestTx(destTx, ormTx); err != nil {
103 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
107 for _, remote := range w.remotes {
108 signs, err := remote.RequestSign(destTx, ormTx)
110 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
114 w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
117 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
118 submittedTxID, err := w.submitTx(destTx)
120 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
124 if submittedTxID != destTxID {
125 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
130 if err := w.updateSubmission(ormTx); err != nil {
131 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
138 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
140 case common.CrossTxRejectedStatus:
141 return errors.New("cross-chain tx rejected")
142 case common.CrossTxSubmittedStatus:
143 return errors.New("cross-chain tx submitted")
144 case common.CrossTxCompletedStatus:
145 return errors.New("cross-chain tx completed")
148 crossTxReqs := []*orm.CrossTransactionReq{}
149 if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
153 if len(crossTxReqs) != len(tx.Reqs) {
154 return errors.New("cross-chain requests count mismatch")
160 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
161 switch tx.Chain.Name {
163 return w.buildSidechainTx(tx)
165 return w.buildMainchainTx(tx)
167 return nil, "", errors.New("unknown source chain")
172 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
173 sidechainTx := &vaporTypes.Tx{}
175 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
176 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
177 }).Error; err != nil {
181 return sidechainTx, sidechainTx.ID.String(), nil
185 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
186 mainchainTx := &btmTypes.Tx{}
188 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
189 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
190 }).Error; err != nil {
194 return mainchainTx, mainchainTx.ID.String(), nil
198 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
199 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
200 return errors.New("cross-chain tx status error")
207 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
211 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
216 func (w *warder) isLeader() bool {
220 func (w *warder) submitTx(destTx interface{}) (string, error) {
221 switch tx := destTx.(type) {
223 return w.mainchainNode.SubmitTx(tx)
225 return w.sidechainNode.SubmitTx(tx)
227 return "", errors.New("unknown destTx type")
231 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
232 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
233 Status: common.CrossTxSubmittedStatus,
234 }).Error; err != nil {
238 for _, remote := range w.remotes {
239 remote.NotifySubmission(tx)