OSDN Git Service

clean
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "encoding/hex"
6         "time"
7
8         btmTypes "github.com/bytom/protocol/bc/types"
9         "github.com/jinzhu/gorm"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/crypto/ed25519/chainkd"
13         "github.com/vapor/errors"
14         "github.com/vapor/federation/common"
15         "github.com/vapor/federation/config"
16         "github.com/vapor/federation/database"
17         "github.com/vapor/federation/database/orm"
18         "github.com/vapor/federation/service"
19         vaporBc "github.com/vapor/protocol/bc"
20         vaporTypes "github.com/vapor/protocol/bc/types"
21 )
22
23 var collectInterval = 5 * time.Second
24
25 type warder struct {
26         db            *gorm.DB
27         assetStore    *database.AssetStore
28         txCh          chan *orm.CrossTransaction
29         fedProg       []byte
30         position      uint8
31         xpub          chainkd.XPub
32         xprv          chainkd.XPrv
33         mainchainNode *service.Node
34         sidechainNode *service.Node
35         remotes       []*service.Warder
36 }
37
38 func NewWarder(db *gorm.DB, assetStore *database.AssetStore, cfg *config.Config) *warder {
39         local, remotes := parseWarders(cfg)
40         return &warder{
41                 db:            db,
42                 assetStore:    assetStore,
43                 txCh:          make(chan *orm.CrossTransaction),
44                 fedProg:       config.ParseFedProg(cfg.Warders, cfg.Quorum),
45                 position:      local.Position,
46                 xpub:          local.XPub,
47                 xprv:          string2xprv(xprvStr),
48                 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
49                 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
50                 remotes:       remotes,
51         }
52 }
53
54 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
55         var local *service.Warder
56         var remotes []*service.Warder
57         for _, warderCfg := range cfg.Warders {
58                 if warderCfg.IsLocal {
59                         local = service.NewWarder(&warderCfg)
60                 } else {
61                         remoteWarder := service.NewWarder(&warderCfg)
62                         remotes = append(remotes, remoteWarder)
63                 }
64         }
65
66         if local == nil {
67                 log.Fatal("none local warder set")
68         }
69
70         return local, remotes
71 }
72
73 func (w *warder) Run() {
74         go w.collectPendingTx()
75         go w.processCrossTxRoutine()
76 }
77
78 func (w *warder) collectPendingTx() {
79         ticker := time.NewTicker(collectInterval)
80         for ; true; <-ticker.C {
81                 txs := []*orm.CrossTransaction{}
82                 if err := w.db.Preload("Chain").Preload("Reqs").
83                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
84                         // otherwise the field "status" will be ignored
85                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
86                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
87                         continue
88                 } else if err != nil {
89                         log.Warnln("collectPendingTx", err)
90                 }
91
92                 for _, tx := range txs {
93                         w.txCh <- tx
94                 }
95         }
96 }
97
98 func (w *warder) processCrossTxRoutine() {
99         for ormTx := range w.txCh {
100                 if err := w.validateCrossTx(ormTx); err != nil {
101                         log.Warnln("invalid cross-chain tx", ormTx)
102                         continue
103                 }
104
105                 destTx, destTxID, err := w.proposeDestTx(ormTx)
106                 if err != nil {
107                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
108                         continue
109                 }
110
111                 if err := w.initDestTxSigns(destTx, ormTx); err != nil {
112                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("initDestTxSigns")
113                         continue
114                 }
115
116                 if err := w.signDestTx(destTx, ormTx); err != nil {
117                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
118                         continue
119                 }
120
121                 for _, remote := range w.remotes {
122                         signs, err := remote.RequestSign(destTx, ormTx)
123                         if err != nil {
124                                 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
125                                 continue
126                         }
127
128                         w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
129                 }
130
131                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
132                         submittedTxID, err := w.submitTx(destTx)
133                         if err != nil {
134                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
135                                 continue
136                         }
137
138                         if submittedTxID != destTxID {
139                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
140                                 continue
141                         }
142
143                         if err := w.updateSubmission(ormTx); err != nil {
144                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
145                                 continue
146                         }
147                 }
148         }
149 }
150
151 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
152         switch tx.Status {
153         case common.CrossTxRejectedStatus:
154                 return errors.New("cross-chain tx rejected")
155         case common.CrossTxSubmittedStatus:
156                 return errors.New("cross-chain tx submitted")
157         case common.CrossTxCompletedStatus:
158                 return errors.New("cross-chain tx completed")
159         default:
160                 return nil
161         }
162 }
163
164 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
165         switch tx.Chain.Name {
166         case "bytom":
167                 return w.buildSidechainTx(tx)
168         case "vapor":
169                 return w.buildMainchainTx(tx)
170         default:
171                 return nil, "", errors.New("unknown source chain")
172         }
173 }
174
175 func (w *warder) buildSidechainTx(ormTx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
176         destTxData := &vaporTypes.TxData{Version: 1, TimeRange: 0}
177         muxID := &vaporBc.Hash{}
178         if err := muxID.UnmarshalText([]byte(ormTx.SourceMuxID)); err != nil {
179                 return nil, "", errors.Wrap(err, "Unmarshal muxID")
180         }
181
182         for _, req := range ormTx.Reqs {
183                 // getAsset from assetStore instead of preload asset, in order to save db query overload
184                 asset, err := w.assetStore.GetByOrmID(req.AssetID)
185                 if err != nil {
186                         return nil, "", errors.Wrap(err, "get asset by ormAsset ID")
187                 }
188
189                 assetID := &vaporBc.AssetID{}
190                 if err := assetID.UnmarshalText([]byte(asset.AssetID)); err != nil {
191                         return nil, "", errors.Wrap(err, "Unmarshal muxID")
192                 }
193
194                 rawDefinitionByte, err := hex.DecodeString(asset.RawDefinitionByte)
195                 if err != nil {
196                         return nil, "", errors.Wrap(err, "decode rawDefinitionByte")
197                 }
198
199                 issuanceProgramByte, err := hex.DecodeString(asset.IssuanceProgram)
200                 if err != nil {
201                         return nil, "", errors.Wrap(err, "decode issuanceProgramByte")
202                 }
203
204                 input := vaporTypes.NewCrossChainInput(nil, *muxID, *assetID, req.AssetAmount, req.SourcePos, 1, rawDefinitionByte, issuanceProgramByte)
205                 destTxData.Inputs = append(destTxData.Inputs, input)
206
207                 controlProgram, err := hex.DecodeString(req.Script)
208                 if err != nil {
209                         return nil, "", errors.Wrap(err, "decode req.Script")
210                 }
211
212                 output := vaporTypes.NewIntraChainOutput(*assetID, req.AssetAmount, controlProgram)
213                 destTxData.Outputs = append(destTxData.Outputs, output)
214         }
215
216         destTx := vaporTypes.NewTx(*destTxData)
217         w.addInputWitness(destTx)
218
219         if err := w.db.Where(ormTx).UpdateColumn(&orm.CrossTransaction{
220                 DestTxHash: sql.NullString{destTx.ID.String(), true},
221         }).Error; err != nil {
222                 return nil, "", err
223         }
224
225         return destTx, destTx.ID.String(), nil
226 }
227
228 func (w *warder) buildMainchainTx(ormTx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
229         return nil, "", errors.New("buildMainchainTx not implemented yet")
230 }
231
232 // tx is a pointer to types.Tx, so the InputArguments can be set and be valid afterward
233 func (w *warder) addInputWitness(tx interface{}) {
234         witness := [][]byte{w.fedProg}
235         switch tx := tx.(type) {
236         case *vaporTypes.Tx:
237                 for i := range tx.Inputs {
238                         tx.SetInputArguments(uint32(i), witness)
239                 }
240
241         case *btmTypes.Tx:
242                 for i := range tx.Inputs {
243                         tx.SetInputArguments(uint32(i), witness)
244                 }
245         }
246 }
247
248 func (w *warder) initDestTxSigns(destTx interface{}, ormTx *orm.CrossTransaction) error {
249         crossTxSigns := []*orm.CrossTransactionSign{}
250         for i := 1; i <= len(w.remotes)+1; i++ {
251                 crossTxSigns = append(crossTxSigns, &orm.CrossTransactionSign{
252                         CrossTransactionID: ormTx.ID,
253                         WarderID:           uint8(i),
254                         Status:             common.CrossTxSignPendingStatus,
255                 })
256         }
257         return w.db.Create(crossTxSigns).Error
258 }
259
260 // TODO:
261 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
262         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
263                 return errors.New("cross-chain tx status error")
264         }
265
266         return nil
267 }
268
269 // TODO:
270 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
271 }
272
273 // TODO:
274 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
275         return false
276 }
277
278 func (w *warder) isLeader() bool {
279         return w.position == 1
280 }
281
282 func (w *warder) submitTx(destTx interface{}) (string, error) {
283         switch tx := destTx.(type) {
284         case *btmTypes.Tx:
285                 return w.mainchainNode.SubmitTx(tx)
286         case *vaporTypes.Tx:
287                 return w.sidechainNode.SubmitTx(tx)
288         default:
289                 return "", errors.New("unknown destTx type")
290         }
291 }
292
293 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
294         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
295                 Status: common.CrossTxSubmittedStatus,
296         }).Error; err != nil {
297                 return err
298         }
299
300         for _, remote := range w.remotes {
301                 remote.NotifySubmission(tx)
302         }
303         return nil
304 }