8 btmTypes "github.com/bytom/protocol/bc/types"
9 "github.com/jinzhu/gorm"
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/crypto/ed25519/chainkd"
13 "github.com/vapor/errors"
14 "github.com/vapor/federation/common"
15 "github.com/vapor/federation/config"
16 "github.com/vapor/federation/database"
17 "github.com/vapor/federation/database/orm"
18 "github.com/vapor/federation/service"
19 vaporBc "github.com/vapor/protocol/bc"
20 vaporTypes "github.com/vapor/protocol/bc/types"
23 var collectInterval = 5 * time.Second
27 assetStore *database.AssetStore
28 txCh chan *orm.CrossTransaction
33 mainchainNode *service.Node
34 sidechainNode *service.Node
35 remotes []*service.Warder
38 func NewWarder(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *warder {
39 local, remotes := parseWarders(cfg)
42 assetStore: assetStore,
43 txCh: make(chan *orm.CrossTransaction),
44 fedProg: ParseFedProg(cfg.Warders, cfg.Quorum),
45 position: local.Position,
47 xprv: string2xprv(xprvStr),
48 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
49 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
54 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
55 var local *service.Warder
56 var remotes []*service.Warder
57 for _, warderCfg := range cfg.Warders {
58 if warderCfg.IsLocal {
59 local = service.NewWarder(&warderCfg)
61 remoteWarder := service.NewWarder(&warderCfg)
62 remotes = append(remotes, remoteWarder)
67 log.Fatal("none local warder set")
73 func (w *warder) Run() {
74 go w.collectPendingTx()
75 go w.processCrossTxRoutine()
78 func (w *warder) collectPendingTx() {
79 ticker := time.NewTicker(collectInterval)
80 for ; true; <-ticker.C {
81 txs := []*orm.CrossTransaction{}
82 if err := w.db.Preload("Chain").Preload("Reqs").
83 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
84 // otherwise the field "status" will be ignored
85 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
86 Find(&txs).Error; err == gorm.ErrRecordNotFound {
88 } else if err != nil {
89 log.Warnln("collectPendingTx", err)
92 for _, tx := range txs {
98 func (w *warder) processCrossTxRoutine() {
99 for ormTx := range w.txCh {
100 if err := w.validateCrossTx(ormTx); err != nil {
101 log.Warnln("invalid cross-chain tx", ormTx)
105 destTx, destTxID, err := w.proposeDestTx(ormTx)
107 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
111 if err := w.initDestTxSigns(destTx, ormTx); err != nil {
112 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
116 if err := w.signDestTx(destTx, ormTx); err != nil {
117 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
121 for _, remote := range w.remotes {
122 signs, err := remote.RequestSign(destTx, ormTx)
124 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
128 w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
131 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
132 submittedTxID, err := w.submitTx(destTx)
134 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
138 if submittedTxID != destTxID {
139 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
143 if err := w.updateSubmission(ormTx); err != nil {
144 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
151 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
153 case common.CrossTxRejectedStatus:
154 return errors.New("cross-chain tx rejected")
155 case common.CrossTxSubmittedStatus:
156 return errors.New("cross-chain tx submitted")
157 case common.CrossTxCompletedStatus:
158 return errors.New("cross-chain tx completed")
164 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
165 switch tx.Chain.Name {
167 return w.buildSidechainTx(tx)
169 return w.buildMainchainTx(tx)
171 return nil, "", errors.New("unknown source chain")
177 // create txSign in db
178 // addInputWitness(tx, signInsts)?
179 func (w *warder) buildSidechainTx(ormTx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
180 destTxData := &vaporTypes.TxData{Version: 1, TimeRange: 0}
181 muxID := &vaporBc.Hash{}
182 if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
183 return nil, "", errors.Wrap(err, "Unmarshal muxID")
186 for _, req := range ormTx.Reqs {
187 // getAsset from assetStore instead of preload asset, in order to save db query overload
188 asset, err := w.assetStore.GetByOrmID(req.AssetID)
190 return nil, "", errors.Wrap(err, "get asset by ormAsset ID")
193 assetID := &vaporBc.AssetID{}
194 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
195 return nil, "", errors.Wrap(err, "Unmarshal muxID")
198 rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
200 return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
203 input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, w.fedProg, rawDefinitionByte)
204 destTxData.Inputs = append(destTxData.Inputs, input)
206 controlProgram, err := hex.DecodeString(req.Script)
208 return nil, "", errors.Wrap(err, "decode req.Script")
211 output := vaporTypes.NewIntraChainOutput(*assetID, req.AssetAmount, controlProgram)
212 destTxData.Outputs = append(destTxData.Outputs, output)
215 destTx := vaporTypes.NewTx(*destTxData)
216 w.addInputWitness(destTx)
218 if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
219 DestTxHash: sql.NullString{destTx.ID.String(), true},
220 }).Error; err != nil {
224 return destTx, destTx.ID.String(), nil
228 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
229 mainchainTx := &btmTypes.Tx{}
231 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
232 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
233 }).Error; err != nil {
237 return mainchainTx, mainchainTx.ID.String(), nil
241 func (w *warder) addInputWitness(tx interface{}) {
249 func (w *warder) initDestTxSigns(destTx interface{}, ormTx *orm.CrossTransaction) error {
250 crossTxSigns := []*orm.CrossTransactionSign{}
251 for i := 1; i <= len(w.remotes)+1; i++ {
252 crossTxSigns = append(crossTxSigns, &orm.CrossTransactionSign{
253 CrossTransactionID: ormTx.ID,
255 Status: common.CrossTxSignPendingStatus,
259 return w.db.Create(crossTxSigns).Error
263 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
264 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
265 return errors.New("cross-chain tx status error")
272 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
276 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
280 func (w *warder) isLeader() bool {
281 return w.position == 1
284 func (w *warder) submitTx(destTx interface{}) (string, error) {
285 switch tx := destTx.(type) {
287 return w.mainchainNode.SubmitTx(tx)
289 return w.sidechainNode.SubmitTx(tx)
291 return "", errors.New("unknown destTx type")
295 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
296 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
297 Status: common.CrossTxSubmittedStatus,
298 }).Error; err != nil {
302 for _, remote := range w.remotes {
303 remote.NotifySubmission(tx)