8 btmBc "github.com/bytom/protocol/bc"
9 btmTypes "github.com/bytom/protocol/bc/types"
10 "github.com/jinzhu/gorm"
11 log "github.com/sirupsen/logrus"
13 "github.com/vapor/crypto/ed25519/chainkd"
14 "github.com/vapor/errors"
15 "github.com/vapor/federation/common"
16 "github.com/vapor/federation/config"
17 "github.com/vapor/federation/database"
18 "github.com/vapor/federation/database/orm"
19 "github.com/vapor/federation/service"
20 vaporBc "github.com/vapor/protocol/bc"
21 vaporTypes "github.com/vapor/protocol/bc/types"
24 var collectInterval = 5 * time.Second
28 assetStore *database.AssetStore
29 txCh chan *orm.CrossTransaction
34 mainchainNode *service.Node
35 sidechainNode *service.Node
36 remotes []*service.Warder
39 func NewWarder(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *warder {
40 local, remotes := parseWarders(cfg)
43 assetStore: assetStore,
44 txCh: make(chan *orm.CrossTransaction),
45 fedProg: ParseFedProg(cfg.Warders, cfg.Quorum),
46 position: local.Position,
48 xprv: string2xprv(xprvStr),
49 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
50 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
55 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
56 var local *service.Warder
57 var remotes []*service.Warder
58 for _, warderCfg := range cfg.Warders {
59 if warderCfg.IsLocal {
60 local = service.NewWarder(&warderCfg)
62 remoteWarder := service.NewWarder(&warderCfg)
63 remotes = append(remotes, remoteWarder)
68 log.Fatal("none local warder set")
74 func (w *warder) Run() {
75 go w.collectPendingTx()
76 go w.processCrossTxRoutine()
79 func (w *warder) collectPendingTx() {
80 ticker := time.NewTicker(collectInterval)
81 for ; true; <-ticker.C {
82 txs := []*orm.CrossTransaction{}
83 if err := w.db.Preload("Chain").Preload("Reqs").
84 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
85 // otherwise the field "status" will be ignored
86 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
87 Find(&txs).Error; err == gorm.ErrRecordNotFound {
89 } else if err != nil {
90 log.Warnln("collectPendingTx", err)
93 for _, tx := range txs {
99 func (w *warder) processCrossTxRoutine() {
100 for ormTx := range w.txCh {
101 if err := w.validateCrossTx(ormTx); err != nil {
102 log.Warnln("invalid cross-chain tx", ormTx)
106 destTx, destTxID, err := w.proposeDestTx(ormTx)
108 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
112 if err := w.initDestTxSigns(destTx, ormTx); err != nil {
113 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
117 if err := w.signDestTx(destTx, ormTx); err != nil {
118 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
122 for _, remote := range w.remotes {
123 signs, err := remote.RequestSign(destTx, ormTx)
125 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
129 w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
132 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
133 submittedTxID, err := w.submitTx(destTx)
135 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
139 if submittedTxID != destTxID {
140 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
144 if err := w.updateSubmission(ormTx); err != nil {
145 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
152 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
154 case common.CrossTxRejectedStatus:
155 return errors.New("cross-chain tx rejected")
156 case common.CrossTxSubmittedStatus:
157 return errors.New("cross-chain tx submitted")
158 case common.CrossTxCompletedStatus:
159 return errors.New("cross-chain tx completed")
165 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
166 switch tx.Chain.Name {
168 return w.buildSidechainTx(tx)
170 return w.buildMainchainTx(tx)
172 return nil, "", errors.New("unknown source chain")
176 func (w *warder) buildSidechainTx(ormTx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
177 destTxData := &vaporTypes.TxData{Version: 1, TimeRange: 0}
178 muxID := &vaporBc.Hash{}
179 if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
180 return nil, "", errors.Wrap(err, "Unmarshal muxID")
183 for _, req := range ormTx.Reqs {
184 // getAsset from assetStore instead of preload asset, in order to save db query overload
185 asset, err := w.assetStore.GetByOrmID(req.AssetID)
187 return nil, "", errors.Wrap(err, "get asset by ormAsset ID")
190 assetID := &vaporBc.AssetID{}
191 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
192 return nil, "", errors.Wrap(err, "Unmarshal muxID")
195 rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
197 return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
200 input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, w.fedProg, rawDefinitionByte)
201 destTxData.Inputs = append(destTxData.Inputs, input)
203 controlProgram, err := hex.DecodeString(req.Script)
205 return nil, "", errors.Wrap(err, "decode req.Script")
208 output := vaporTypes.NewIntraChainOutput(*assetID, req.AssetAmount, controlProgram)
209 destTxData.Outputs = append(destTxData.Outputs, output)
212 destTx := vaporTypes.NewTx(*destTxData)
213 w.addInputWitness(destTx)
215 if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
216 DestTxHash: sql.NullString{destTx.ID.String(), true},
217 }).Error; err != nil {
221 return destTx, destTx.ID.String(), nil
225 func (w *warder) buildMainchainTx(ormTx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
226 destTxData := &btmTypes.TxData{Version: 1, TimeRange: 0}
227 muxID := &btmBc.Hash{}
228 if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
229 return nil, "", errors.Wrap(err, "Unmarshal muxID")
232 for _, req := range ormTx.Reqs {
233 // getAsset from assetStore instead of preload asset, in order to save db query overload
234 asset, err := w.assetStore.GetByOrmID(req.AssetID)
236 return nil, "", errors.Wrap(err, "get asset by ormAsset ID")
239 assetID := &btmBc.AssetID{}
240 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
241 return nil, "", errors.Wrap(err, "Unmarshal muxID")
244 // rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
246 // return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
249 // input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, w.fedProg, rawDefinitionByte)
250 // destTxData.Inputs = append(destTxData.Inputs, input)
252 // controlProgram, err := hex.DecodeString(req.Script)
254 // return nil, "", errors.Wrap(err, "decode req.Script")
257 // output := vaporTypes.NewIntraChainOutput(*assetID, req.AssetAmount, controlProgram)
258 // destTxData.Outputs = append(destTxData.Outputs, output)
261 destTx := btmTypes.NewTx(*destTxData)
262 w.addInputWitness(destTx)
264 if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
265 DestTxHash: sql.NullString{destTx.ID.String(), true},
266 }).Error; err != nil {
270 return destTx, destTx.ID.String(), nil
274 // tx is a pointer to types.Tx, so the InputArguments can be set and be valid afterward
275 func (w *warder) addInputWitness(tx interface{}) {
276 witness := [][]byte{w.fedProg}
277 switch tx := tx.(type) {
279 for i := range tx.Inputs {
280 tx.SetInputArguments(uint32(i), witness)
284 for i := range tx.Inputs {
285 tx.SetInputArguments(uint32(i), witness)
290 func (w *warder) initDestTxSigns(destTx interface{}, ormTx *orm.CrossTransaction) error {
291 crossTxSigns := []*orm.CrossTransactionSign{}
292 for i := 1; i <= len(w.remotes)+1; i++ {
293 crossTxSigns = append(crossTxSigns, &orm.CrossTransactionSign{
294 CrossTransactionID: ormTx.ID,
296 Status: common.CrossTxSignPendingStatus,
299 return w.db.Create(crossTxSigns).Error
303 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
304 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
305 return errors.New("cross-chain tx status error")
312 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
316 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
320 func (w *warder) isLeader() bool {
321 return w.position == 1
324 func (w *warder) submitTx(destTx interface{}) (string, error) {
325 switch tx := destTx.(type) {
327 return w.mainchainNode.SubmitTx(tx)
329 return w.sidechainNode.SubmitTx(tx)
331 return "", errors.New("unknown destTx type")
335 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
336 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
337 Status: common.CrossTxSubmittedStatus,
338 }).Error; err != nil {
342 for _, remote := range w.remotes {
343 remote.NotifySubmission(tx)