8 btmTypes "github.com/bytom/protocol/bc/types"
9 "github.com/jinzhu/gorm"
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/crypto/ed25519/chainkd"
13 "github.com/vapor/errors"
14 "github.com/vapor/federation/common"
15 "github.com/vapor/federation/config"
16 "github.com/vapor/federation/database/orm"
17 "github.com/vapor/federation/service"
18 vaporBc "github.com/vapor/protocol/bc"
19 vaporTypes "github.com/vapor/protocol/bc/types"
22 var collectInterval = 5 * time.Second
24 var xprvStr = "d20e3d81ba2c5509619fbc276d7cd8b94f52a1dce1291ae9e6b28d4a48ee67d8ac5826ba65c9da0b035845b7cb379e816c529194c7e369492d8828dee5ede3e2"
26 func string2xprv(str string) (xprv chainkd.XPrv) {
27 if err := xprv.UnmarshalText([]byte(str)); err != nil {
28 log.Panicf("fail to convert xprv string")
35 assetKeeper *service.AssetKeeper
36 txCh chan *orm.CrossTransaction
41 mainchainNode *service.Node
42 sidechainNode *service.Node
43 remotes []*service.Warder
46 func NewWarder(db *gorm.DB, assetKeeper *service.AssetKeeper, cfg *config.Config) *warder {
47 local, remotes := parseWarders(cfg)
50 assetKeeper: assetKeeper,
51 txCh: make(chan *orm.CrossTransaction),
52 fedProg: ParseFedProg(cfg.Warders, cfg.Quorum),
53 position: local.Position,
56 xprv: string2xprv(xprvStr),
57 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
58 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
63 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
64 var local *service.Warder
65 var remotes []*service.Warder
66 for _, warderCfg := range cfg.Warders {
67 // TODO: use private key to check
68 if warderCfg.IsLocal {
69 local = service.NewWarder(&warderCfg)
71 remoteWarder := service.NewWarder(&warderCfg)
72 remotes = append(remotes, remoteWarder)
77 log.Fatal("none local warder set")
83 func (w *warder) Run() {
84 go w.collectPendingTx()
85 go w.processCrossTxRoutine()
88 func (w *warder) collectPendingTx() {
89 ticker := time.NewTicker(collectInterval)
90 for ; true; <-ticker.C {
91 txs := []*orm.CrossTransaction{}
92 if err := w.db.Preload("Chain").Preload("Reqs").
93 // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
94 // otherwise the field "status" will be ignored
95 Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
96 Find(&txs).Error; err == gorm.ErrRecordNotFound {
98 } else if err != nil {
99 log.Warnln("collectPendingTx", err)
102 for _, tx := range txs {
108 func (w *warder) processCrossTxRoutine() {
109 for ormTx := range w.txCh {
110 if err := w.validateCrossTx(ormTx); err != nil {
111 log.Warnln("invalid cross-chain tx", ormTx)
115 destTx, destTxID, err := w.proposeDestTx(ormTx)
117 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
121 if err := w.signDestTx(destTx, ormTx); err != nil {
122 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
126 for _, remote := range w.remotes {
127 signs, err := remote.RequestSign(destTx, ormTx)
129 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
133 w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
136 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
137 submittedTxID, err := w.submitTx(destTx)
139 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
143 if submittedTxID != destTxID {
144 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
148 if err := w.updateSubmission(ormTx); err != nil {
149 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
156 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
158 case common.CrossTxRejectedStatus:
159 return errors.New("cross-chain tx rejected")
160 case common.CrossTxSubmittedStatus:
161 return errors.New("cross-chain tx submitted")
162 case common.CrossTxCompletedStatus:
163 return errors.New("cross-chain tx completed")
169 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
170 switch tx.Chain.Name {
172 return w.buildSidechainTx(tx)
174 return w.buildMainchainTx(tx)
176 return nil, "", errors.New("unknown source chain")
182 // addInputWitness(tx, signInsts)?
183 func (w *warder) buildSidechainTx(ormTx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
184 destTxData := &vaporTypes.TxData{Version: 1, TimeRange: 0}
185 // signInsts := []*SigningInstruction{}
186 muxID := &vaporBc.Hash{}
187 if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
188 return nil, "", errors.Wrap(err, "Unmarshal muxID")
191 for _, req := range ormTx.Reqs {
193 // getAsset from assetKeeper instead of preload asset, in order to save db query overload
194 asset := &orm.Asset{}
195 assetID := &vaporBc.AssetID{}
196 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
197 return nil, "", errors.Wrap(err, "Unmarshal muxID")
200 rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
202 return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
205 input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, w.fedProg, rawDefinitionByte)
206 destTxData.Inputs = append(destTxData.Inputs, input)
211 // txInput := btmTypes.NewSpendInput(nil, *utxoInfo.SourceID, *assetID, utxo.Amount, utxoInfo.SourcePos, cp)
212 // tx.Inputs = append(tx.Inputs, txInput)
214 // signInst := &SigningInstruction{}
215 // if utxo.Address == nil || utxo.Address.Wallet == nil {
216 // return signInst, nil
219 // path := pathForAddress(utxo.Address.Wallet.Idx, utxo.Address.Idx, utxo.Address.Change)
220 // for _, p := range path {
221 // signInst.DerivationPath = append(signInst.DerivationPath, hex.EncodeToString(p))
224 // xPubs, err := signersToXPubs(utxo.Address.Wallet.WalletSigners)
226 // return nil, errors.Wrap(err, "signersToXPubs")
229 // derivedXPubs := chainkd.DeriveXPubs(xPubs, path)
230 // derivedPKs := chainkd.XPubKeys(derivedXPubs)
231 // if len(derivedPKs) == 1 {
232 // signInst.DataWitness = derivedPKs[0]
233 // signInst.Pubkey = hex.EncodeToString(derivedPKs[0])
234 // } else if len(derivedPKs) > 1 {
235 // if signInst.DataWitness, err = vmutil.P2SPMultiSigProgram(derivedPKs, int(utxo.Address.Wallet.M)); err != nil {
239 // return signInst, nil
241 // signInsts = append(signInsts, signInst)
245 // add the payment output && handle the fee
246 // if err := addOutput(txData, address, asset, amount, netParams); err != nil {
247 // return nil, nil, errors.Wrap(err, "add payment output")
250 destTx := vaporTypes.NewTx(*destTxData)
251 // addInputWitness(tx, signInsts)
253 if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
254 DestTxHash: sql.NullString{destTx.ID.String(), true},
255 }).Error; err != nil {
259 return destTx, destTx.ID.String(), nil
263 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
264 mainchainTx := &btmTypes.Tx{}
266 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
267 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
268 }).Error; err != nil {
272 return mainchainTx, mainchainTx.ID.String(), nil
276 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
277 if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
278 return errors.New("cross-chain tx status error")
285 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
289 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
293 func (w *warder) isLeader() bool {
294 return w.position == 1
297 func (w *warder) submitTx(destTx interface{}) (string, error) {
298 switch tx := destTx.(type) {
300 return w.mainchainNode.SubmitTx(tx)
302 return w.sidechainNode.SubmitTx(tx)
304 return "", errors.New("unknown destTx type")
308 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
309 if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
310 Status: common.CrossTxSubmittedStatus,
311 }).Error; err != nil {
315 for _, remote := range w.remotes {
316 remote.NotifySubmission(tx)