OSDN Git Service

dododo
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "time"
6
7         btmTypes "github.com/bytom/protocol/bc/types"
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/errors"
13         "github.com/vapor/federation/common"
14         "github.com/vapor/federation/config"
15         "github.com/vapor/federation/database/orm"
16         "github.com/vapor/federation/service"
17         vaporTypes "github.com/vapor/protocol/bc/types"
18 )
19
20 var collectInterval = 5 * time.Second
21
22 var xprvStr = "d20e3d81ba2c5509619fbc276d7cd8b94f52a1dce1291ae9e6b28d4a48ee67d8ac5826ba65c9da0b035845b7cb379e816c529194c7e369492d8828dee5ede3e2"
23
24 func string2xprv(str string) (xprv chainkd.XPrv) {
25         if err := xprv.UnmarshalText([]byte(str)); err != nil {
26                 log.Panicf("fail to convert xprv string")
27         }
28         return xprv
29 }
30
31 type warder struct {
32         position       uint8
33         xpub           chainkd.XPub
34         xprv           chainkd.XPrv
35         colletInterval time.Duration
36         db             *gorm.DB
37         txCh           chan *orm.CrossTransaction
38         mainchainNode  *service.Node
39         sidechainNode  *service.Node
40         remotes        []*service.Warder
41 }
42
43 func NewWarder(cfg *config.Config, db *gorm.DB) *warder {
44         local, remotes := parseWarders(cfg)
45         return &warder{
46                 position: local.Position,
47                 xpub:     local.XPub,
48                 // TODO:
49                 xprv:          string2xprv(xprvStr),
50                 db:            db,
51                 txCh:          make(chan *orm.CrossTransaction),
52                 mainchainNode: service.NewNode(cfg.Mainchain.Upstream),
53                 sidechainNode: service.NewNode(cfg.Sidechain.Upstream),
54                 remotes:       remotes,
55         }
56 }
57
58 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
59         var local *service.Warder
60         var remotes []*service.Warder
61         for _, warderCfg := range cfg.Warders {
62                 if warderCfg.IsLocal {
63                         local = service.NewWarder(&warderCfg)
64                 } else {
65                         remoteWarder := service.NewWarder(&warderCfg)
66                         remotes = append(remotes, remoteWarder)
67                 }
68         }
69
70         if local == nil {
71                 log.Fatal("none local warder set")
72         }
73
74         return local, remotes
75 }
76
77 func (w *warder) Run() {
78         go w.collectPendingTx()
79         go w.processCrossTxRoutine()
80 }
81
82 func (w *warder) collectPendingTx() {
83         ticker := time.NewTicker(collectInterval)
84         for ; true; <-ticker.C {
85                 txs := []*orm.CrossTransaction{}
86                 if err := w.db.Preload("Chain").Preload("Reqs").
87                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
88                         // otherwise the field "status" will be ignored
89                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
90                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
91                         continue
92                 } else if err != nil {
93                         log.Warnln("collectPendingTx", err)
94                 }
95
96                 for _, tx := range txs {
97                         w.txCh <- tx
98                 }
99         }
100 }
101
102 func (w *warder) processCrossTxRoutine() {
103         for ormTx := range w.txCh {
104                 if err := w.validateCrossTx(ormTx); err != nil {
105                         log.Warnln("invalid cross-chain tx", ormTx)
106                         continue
107                 }
108
109                 destTx, destTxID, err := w.proposeDestTx(ormTx)
110                 if err != nil {
111                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
112                         continue
113                 }
114
115                 if err := w.signDestTx(destTx, ormTx); err != nil {
116                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
117                         continue
118                 }
119
120                 for _, remote := range w.remotes {
121                         signs, err := remote.RequestSign(destTx, ormTx)
122                         if err != nil {
123                                 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
124                                 continue
125                         }
126
127                         w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
128                 }
129
130                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
131                         submittedTxID, err := w.submitTx(destTx)
132                         if err != nil {
133                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
134                                 continue
135                         }
136
137                         if submittedTxID != destTxID {
138                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
139                                 continue
140                         }
141
142                         if err := w.updateSubmission(ormTx); err != nil {
143                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
144                                 continue
145                         }
146                 }
147         }
148 }
149
150 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
151         switch tx.Status {
152         case common.CrossTxRejectedStatus:
153                 return errors.New("cross-chain tx rejected")
154         case common.CrossTxSubmittedStatus:
155                 return errors.New("cross-chain tx submitted")
156         case common.CrossTxCompletedStatus:
157                 return errors.New("cross-chain tx completed")
158         default:
159                 return nil
160         }
161 }
162
163 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
164         switch tx.Chain.Name {
165         case "bytom":
166                 return w.buildSidechainTx(tx)
167         case "vapor":
168                 return w.buildMainchainTx(tx)
169         default:
170                 return nil, "", errors.New("unknown source chain")
171         }
172 }
173
174 // TODO:
175 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
176         sidechainTx := &vaporTypes.Tx{}
177
178         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
179                 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
180         }).Error; err != nil {
181                 return nil, "", err
182         }
183
184         return sidechainTx, sidechainTx.ID.String(), nil
185 }
186
187 // TODO:
188 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
189         mainchainTx := &btmTypes.Tx{}
190
191         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
192                 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
193         }).Error; err != nil {
194                 return nil, "", err
195         }
196
197         return mainchainTx, mainchainTx.ID.String(), nil
198 }
199
200 // TODO:
201 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
202         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
203                 return errors.New("cross-chain tx status error")
204         }
205
206         return nil
207 }
208
209 // TODO:
210 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
211 }
212
213 // TODO:
214 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
215         return false
216 }
217
218 // TODO:
219 func (w *warder) isLeader() bool {
220         return false
221 }
222
223 func (w *warder) submitTx(destTx interface{}) (string, error) {
224         switch tx := destTx.(type) {
225         case *btmTypes.Tx:
226                 return w.mainchainNode.SubmitTx(tx)
227         case *vaporTypes.Tx:
228                 return w.sidechainNode.SubmitTx(tx)
229         default:
230                 return "", errors.New("unknown destTx type")
231         }
232 }
233
234 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
235         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
236                 Status: common.CrossTxSubmittedStatus,
237         }).Error; err != nil {
238                 return err
239         }
240
241         for _, remote := range w.remotes {
242                 remote.NotifySubmission(tx)
243         }
244         return nil
245 }