OSDN Git Service

clean
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "encoding/hex"
6         "time"
7
8         btmTypes "github.com/bytom/protocol/bc/types"
9         "github.com/jinzhu/gorm"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/crypto/ed25519/chainkd"
13         "github.com/vapor/errors"
14         "github.com/vapor/federation/common"
15         "github.com/vapor/federation/config"
16         "github.com/vapor/federation/database/orm"
17         "github.com/vapor/federation/service"
18         vaporBc "github.com/vapor/protocol/bc"
19         vaporTypes "github.com/vapor/protocol/bc/types"
20 )
21
22 var collectInterval = 5 * time.Second
23
24 var xprvStr = "d20e3d81ba2c5509619fbc276d7cd8b94f52a1dce1291ae9e6b28d4a48ee67d8ac5826ba65c9da0b035845b7cb379e816c529194c7e369492d8828dee5ede3e2"
25
26 func string2xprv(str string) (xprv chainkd.XPrv) {
27         if err := xprv.UnmarshalText([]byte(str)); err != nil {
28                 log.Panicf("fail to convert xprv string")
29         }
30         return xprv
31 }
32
33 type warder struct {
34         db            *gorm.DB
35         txCh          chan *orm.CrossTransaction
36         fedProg       []byte
37         position      uint8
38         xpub          chainkd.XPub
39         xprv          chainkd.XPrv
40         mainchainNode *service.Node
41         sidechainNode *service.Node
42         remotes       []*service.Warder
43 }
44
45 func NewWarder(db *gorm.DB, cfg *config.Config) *warder {
46         local, remotes := parseWarders(cfg)
47         return &warder{
48                 db:       db,
49                 txCh:     make(chan *orm.CrossTransaction),
50                 fedProg:  ParseFedProg(cfg.Warders, cfg.Quorum),
51                 position: local.Position,
52                 xpub:     local.XPub,
53                 // TODO:
54                 xprv:          string2xprv(xprvStr),
55                 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
56                 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
57                 remotes:       remotes,
58         }
59 }
60
61 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
62         var local *service.Warder
63         var remotes []*service.Warder
64         for _, warderCfg := range cfg.Warders {
65                 // TODO: use private key to check
66                 if warderCfg.IsLocal {
67                         local = service.NewWarder(&warderCfg)
68                 } else {
69                         remoteWarder := service.NewWarder(&warderCfg)
70                         remotes = append(remotes, remoteWarder)
71                 }
72         }
73
74         if local == nil {
75                 log.Fatal("none local warder set")
76         }
77
78         return local, remotes
79 }
80
81 func (w *warder) Run() {
82         go w.collectPendingTx()
83         go w.processCrossTxRoutine()
84 }
85
86 func (w *warder) collectPendingTx() {
87         ticker := time.NewTicker(collectInterval)
88         for ; true; <-ticker.C {
89                 txs := []*orm.CrossTransaction{}
90                 if err := w.db.Preload("Chain").Preload("Reqs").
91                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
92                         // otherwise the field "status" will be ignored
93                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
94                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
95                         continue
96                 } else if err != nil {
97                         log.Warnln("collectPendingTx", err)
98                 }
99
100                 for _, tx := range txs {
101                         w.txCh <- tx
102                 }
103         }
104 }
105
106 func (w *warder) processCrossTxRoutine() {
107         for ormTx := range w.txCh {
108                 if err := w.validateCrossTx(ormTx); err != nil {
109                         log.Warnln("invalid cross-chain tx", ormTx)
110                         continue
111                 }
112
113                 destTx, destTxID, err := w.proposeDestTx(ormTx)
114                 if err != nil {
115                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
116                         continue
117                 }
118
119                 if err := w.signDestTx(destTx, ormTx); err != nil {
120                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
121                         continue
122                 }
123
124                 for _, remote := range w.remotes {
125                         signs, err := remote.RequestSign(destTx, ormTx)
126                         if err != nil {
127                                 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
128                                 continue
129                         }
130
131                         w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
132                 }
133
134                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
135                         submittedTxID, err := w.submitTx(destTx)
136                         if err != nil {
137                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
138                                 continue
139                         }
140
141                         if submittedTxID != destTxID {
142                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
143                                 continue
144                         }
145
146                         if err := w.updateSubmission(ormTx); err != nil {
147                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
148                                 continue
149                         }
150                 }
151         }
152 }
153
154 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
155         switch tx.Status {
156         case common.CrossTxRejectedStatus:
157                 return errors.New("cross-chain tx rejected")
158         case common.CrossTxSubmittedStatus:
159                 return errors.New("cross-chain tx submitted")
160         case common.CrossTxCompletedStatus:
161                 return errors.New("cross-chain tx completed")
162         default:
163                 return nil
164         }
165 }
166
167 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
168         switch tx.Chain.Name {
169         case "bytom":
170                 return w.buildSidechainTx(tx)
171         case "vapor":
172                 return w.buildMainchainTx(tx)
173         default:
174                 return nil, "", errors.New("unknown source chain")
175         }
176 }
177
178 // TODO:
179 // signInsts?
180 // addInputWitness(tx, signInsts)?
181 func (w *warder) buildSidechainTx(ormTx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
182         destTxData := &vaporTypes.TxData{Version: 1, TimeRange: 0}
183         // signInsts := []*SigningInstruction{}
184         muxID := &vaporBc.Hash{}
185         if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
186                 return nil, "", errors.Wrap(err, "Unmarshal muxID")
187         }
188
189         for _, req := range ormTx.Reqs {
190                 // TODO:
191                 // getAsset from assetKeeper instead of preload asset, in order to save db query overload
192                 asset := &orm.Asset{}
193                 assetID := &vaporBc.AssetID{}
194                 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
195                         return nil, "", errors.Wrap(err, "Unmarshal muxID")
196                 }
197
198                 rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
199                 if err != nil {
200                         return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
201                 }
202
203                 input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, w.fedProg, rawDefinitionByte)
204                 destTxData.Inputs = append(destTxData.Inputs, input)
205         }
206
207         // for?{
208
209         // txInput := btmTypes.NewSpendInput(nil, *utxoInfo.SourceID, *assetID, utxo.Amount, utxoInfo.SourcePos, cp)
210         // tx.Inputs = append(tx.Inputs, txInput)
211
212         // signInst := &SigningInstruction{}
213         // if utxo.Address == nil || utxo.Address.Wallet == nil {
214         //     return signInst, nil
215         // }
216
217         // path := pathForAddress(utxo.Address.Wallet.Idx, utxo.Address.Idx, utxo.Address.Change)
218         // for _, p := range path {
219         //     signInst.DerivationPath = append(signInst.DerivationPath, hex.EncodeToString(p))
220         // }
221
222         // xPubs, err := signersToXPubs(utxo.Address.Wallet.WalletSigners)
223         // if err != nil {
224         //     return nil, errors.Wrap(err, "signersToXPubs")
225         // }
226
227         // derivedXPubs := chainkd.DeriveXPubs(xPubs, path)
228         // derivedPKs := chainkd.XPubKeys(derivedXPubs)
229         // if len(derivedPKs) == 1 {
230         //     signInst.DataWitness = derivedPKs[0]
231         //     signInst.Pubkey = hex.EncodeToString(derivedPKs[0])
232         // } else if len(derivedPKs) > 1 {
233         //     if signInst.DataWitness, err = vmutil.P2SPMultiSigProgram(derivedPKs, int(utxo.Address.Wallet.M)); err != nil {
234         //         return nil, err
235         //     }
236         // }
237         // return signInst, nil
238
239         // signInsts = append(signInsts, signInst)
240
241         // }
242
243         // add the payment output && handle the fee
244         // if err := addOutput(txData, address, asset, amount, netParams); err != nil {
245         //     return nil, nil, errors.Wrap(err, "add payment output")
246         // }
247
248         destTx := vaporTypes.NewTx(*destTxData)
249         // addInputWitness(tx, signInsts)
250
251         if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
252                 DestTxHash: sql.NullString{destTx.ID.String(), true},
253         }).Error; err != nil {
254                 return nil, "", err
255         }
256
257         return destTx, destTx.ID.String(), nil
258 }
259
260 // TODO:
261 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
262         mainchainTx := &btmTypes.Tx{}
263
264         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
265                 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
266         }).Error; err != nil {
267                 return nil, "", err
268         }
269
270         return mainchainTx, mainchainTx.ID.String(), nil
271 }
272
273 // TODO:
274 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
275         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
276                 return errors.New("cross-chain tx status error")
277         }
278
279         return nil
280 }
281
282 // TODO:
283 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
284 }
285
286 // TODO:
287 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
288         return false
289 }
290
291 func (w *warder) isLeader() bool {
292         return w.position == 1
293 }
294
295 func (w *warder) submitTx(destTx interface{}) (string, error) {
296         switch tx := destTx.(type) {
297         case *btmTypes.Tx:
298                 return w.mainchainNode.SubmitTx(tx)
299         case *vaporTypes.Tx:
300                 return w.sidechainNode.SubmitTx(tx)
301         default:
302                 return "", errors.New("unknown destTx type")
303         }
304 }
305
306 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
307         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
308                 Status: common.CrossTxSubmittedStatus,
309         }).Error; err != nil {
310                 return err
311         }
312
313         for _, remote := range w.remotes {
314                 remote.NotifySubmission(tx)
315         }
316         return nil
317 }