7 btmTypes "github.com/bytom/protocol/bc/types"
8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/errors"
13 "github.com/vapor/federation/common"
14 "github.com/vapor/federation/config"
15 "github.com/vapor/federation/database/orm"
16 "github.com/vapor/federation/service"
17 vaporTypes "github.com/vapor/protocol/bc/types"
20 var collectInterval = 5 * time.Second
22 var xprvStr = "d20e3d81ba2c5509619fbc276d7cd8b94f52a1dce1291ae9e6b28d4a48ee67d8ac5826ba65c9da0b035845b7cb379e816c529194c7e369492d8828dee5ede3e2"
24 func string2xprv(str string) (xprv chainkd.XPrv) {
25 if err := xprv.UnmarshalText([]byte(str)); err != nil {
26 log.Panicf("fail to convert xprv string")
35 colletInterval time.Duration
37 txCh chan *orm.CrossTransaction
38 mainchainNode *service.Node
39 sidechainNode *service.Node
40 remotes []*service.Warder
43 func NewWarder(cfg *config.Config, db *gorm.DB) *warder {
44 local, remotes := parseWarders(cfg)
46 position: local.Position,
49 xprv: string2xprv(xprvStr),
51 txCh: make(chan *orm.CrossTransaction),
52 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
53 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
58 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
59 var local *service.Warder
60 var remotes []*service.Warder
61 for _, warderCfg := range cfg.Warders {
62 if warderCfg.IsLocal {
63 local = service.NewWarder(&warderCfg)
65 remoteWarder := service.NewWarder(&warderCfg)
66 remotes = append(remotes, remoteWarder)
71 log.Fatal("none local warder set")
77 func (w *warder) Run() {
78 go w.collectPendingTx()
79 go w.processCrossTxRoutine()
82 func (w *warder) collectPendingTx() {
83 ticker := time.NewTicker(collectInterval)
84 for ; true; <-ticker.C {
85 txs := []*orm.CrossTransaction{}
86 if err := w.db.Preload("Chain").Preload("Reqs").
87 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
88 // otherwise the field "status" will be ignored
89 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
90 Find(&txs).Error; err == gorm.ErrRecordNotFound {
92 } else if err != nil {
93 log.Warnln("collectPendingTx", err)
96 for _, tx := range txs {
102 func (w *warder) processCrossTxRoutine() {
103 for ormTx := range w.txCh {
104 if err := w.validateCrossTx(ormTx); err != nil {
105 log.Warnln("invalid cross-chain tx", ormTx)
109 destTx, destTxID, err := w.proposeDestTx(ormTx)
111 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
115 if err := w.signDestTx(destTx, ormTx); err != nil {
116 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
120 for _, remote := range w.remotes {
121 signs, err := remote.RequestSign(destTx, ormTx)
123 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
127 w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
130 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
131 submittedTxID, err := w.submitTx(destTx)
133 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
137 if submittedTxID != destTxID {
138 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
142 if err := w.updateSubmission(ormTx); err != nil {
143 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
150 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
152 case common.CrossTxRejectedStatus:
153 return errors.New("cross-chain tx rejected")
154 case common.CrossTxSubmittedStatus:
155 return errors.New("cross-chain tx submitted")
156 case common.CrossTxCompletedStatus:
157 return errors.New("cross-chain tx completed")
163 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
164 switch tx.Chain.Name {
166 return w.buildSidechainTx(tx)
168 return w.buildMainchainTx(tx)
170 return nil, "", errors.New("unknown source chain")
175 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
176 sidechainTx := &vaporTypes.Tx{}
178 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
179 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
180 }).Error; err != nil {
184 return sidechainTx, sidechainTx.ID.String(), nil
188 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
189 mainchainTx := &btmTypes.Tx{}
191 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
192 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
193 }).Error; err != nil {
197 return mainchainTx, mainchainTx.ID.String(), nil
201 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
202 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
203 return errors.New("cross-chain tx status error")
210 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
214 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
219 func (w *warder) isLeader() bool {
223 func (w *warder) submitTx(destTx interface{}) (string, error) {
224 switch tx := destTx.(type) {
226 return w.mainchainNode.SubmitTx(tx)
228 return w.sidechainNode.SubmitTx(tx)
230 return "", errors.New("unknown destTx type")
234 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
235 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
236 Status: common.CrossTxSubmittedStatus,
237 }).Error; err != nil {
241 for _, remote := range w.remotes {
242 remote.NotifySubmission(tx)