OSDN Git Service

update
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "time"
6
7         btmTypes "github.com/bytom/protocol/bc/types"
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/errors"
13         "github.com/vapor/federation/common"
14         "github.com/vapor/federation/config"
15         "github.com/vapor/federation/database/orm"
16         "github.com/vapor/federation/service"
17         vaporTypes "github.com/vapor/protocol/bc/types"
18 )
19
20 type warder struct {
21         position       uint8
22         xpub           chainkd.XPub
23         colletInterval time.Duration
24         db             *gorm.DB
25         txCh           chan *orm.CrossTransaction
26         mainchainNode  *service.Node
27         sidechainNode  *service.Node
28         remotes        []*service.Warder
29 }
30
31 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
32         local, remotes := parseWarders(cfg)
33         return &warder{
34                 position:       local.Position,
35                 xpub:           local.XPub,
36                 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
37                 db:             db,
38                 txCh:           txCh,
39                 mainchainNode:  service.NewNode(cfg.Mainchain.Upstream),
40                 sidechainNode:  service.NewNode(cfg.Sidechain.Upstream),
41                 remotes:        remotes,
42         }
43 }
44
45 func parseWarders(cfg *config.Config) (*service.Warder, []*service.Warder) {
46         var local *service.Warder
47         var remotes []*service.Warder
48         for _, warderCfg := range cfg.Warders {
49                 if warderCfg.IsLocal {
50                         local = service.NewWarder(&warderCfg)
51                 } else {
52                         remoteWarder := service.NewWarder(&warderCfg)
53                         remotes = append(remotes, remoteWarder)
54                 }
55         }
56
57         if local == nil {
58                 log.Fatal("none local warder set")
59         }
60
61         return local, remotes
62 }
63
64 func (w *warder) Run() {
65         go w.collectPendingTx()
66         go w.processCrossTxRoutine()
67 }
68
69 func (w *warder) collectPendingTx() {
70         ticker := time.NewTicker(w.colletInterval)
71         for ; true; <-ticker.C {
72                 txs := []*orm.CrossTransaction{}
73                 if err := w.db.Preload("Chain").Preload("Reqs").
74                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly,
75                         // otherwise the field "status" will be ignored
76                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
77                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
78                         continue
79                 } else if err != nil {
80                         log.Warnln("collectPendingTx", err)
81                 }
82
83                 for _, tx := range txs {
84                         w.txCh <- tx
85                 }
86         }
87 }
88
89 func (w *warder) processCrossTxRoutine() {
90         for ormTx := range w.txCh {
91                 if err := w.validateCrossTx(ormTx); err != nil {
92                         log.Warnln("invalid cross-chain tx", ormTx)
93                         continue
94                 }
95
96                 destTx, destTxID, err := w.proposeDestTx(ormTx)
97                 if err != nil {
98                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
99                         continue
100                 }
101
102                 if err := w.signDestTx(destTx, ormTx); err != nil {
103                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
104                         continue
105                 }
106
107                 for _, remote := range w.remotes {
108                         signs, err := remote.RequestSign(destTx, ormTx)
109                         if err != nil {
110                                 log.WithFields(log.Fields{"err": err, "remote": remote, "cross-chain tx": ormTx}).Warnln("RequestSign")
111                                 continue
112                         }
113
114                         w.attachSignsForTx(destTx, ormTx, remote.Position, signs)
115                 }
116
117                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
118                         submittedTxID, err := w.submitTx(destTx)
119                         if err != nil {
120                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
121                                 continue
122                         }
123
124                         if submittedTxID != destTxID {
125                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "builtTx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
126                                 continue
127
128                         }
129
130                         if err := w.updateSubmission(ormTx); err != nil {
131                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
132                                 continue
133                         }
134                 }
135         }
136 }
137
138 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
139         switch tx.Status {
140         case common.CrossTxRejectedStatus:
141                 return errors.New("cross-chain tx rejected")
142         case common.CrossTxSubmittedStatus:
143                 return errors.New("cross-chain tx submitted")
144         case common.CrossTxCompletedStatus:
145                 return errors.New("cross-chain tx completed")
146         }
147
148         crossTxReqs := []*orm.CrossTransactionReq{}
149         if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
150                 return err
151         }
152
153         if len(crossTxReqs) != len(tx.Reqs) {
154                 return errors.New("cross-chain requests count mismatch")
155         }
156
157         return nil
158 }
159
160 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
161         switch tx.Chain.Name {
162         case "bytom":
163                 return w.buildSidechainTx(tx)
164         case "vapor":
165                 return w.buildMainchainTx(tx)
166         default:
167                 return nil, "", errors.New("unknown source chain")
168         }
169 }
170
171 // TODO:
172 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
173         sidechainTx := &vaporTypes.Tx{}
174
175         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
176                 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
177         }).Error; err != nil {
178                 return nil, "", err
179         }
180
181         return sidechainTx, sidechainTx.ID.String(), nil
182 }
183
184 // TODO:
185 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
186         mainchainTx := &btmTypes.Tx{}
187
188         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
189                 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
190         }).Error; err != nil {
191                 return nil, "", err
192         }
193
194         return mainchainTx, mainchainTx.ID.String(), nil
195 }
196
197 // TODO: sign it
198 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
199         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
200                 return errors.New("cross-chain tx status error")
201         }
202
203         return nil
204 }
205
206 // TODO:
207 func (w *warder) attachSignsForTx(destTx interface{}, ormTx *orm.CrossTransaction, position uint8, signs string) {
208 }
209
210 // TODO:
211 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
212         return false
213 }
214
215 // TODO:
216 func (w *warder) isLeader() bool {
217         return false
218 }
219
220 func (w *warder) submitTx(destTx interface{}) (string, error) {
221         switch tx := destTx.(type) {
222         case *btmTypes.Tx:
223                 return w.mainchainNode.SubmitTx(tx)
224         case *vaporTypes.Tx:
225                 return w.sidechainNode.SubmitTx(tx)
226         default:
227                 return "", errors.New("unknown destTx type")
228         }
229 }
230
231 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
232         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
233                 Status: common.CrossTxSubmittedStatus,
234         }).Error; err != nil {
235                 return err
236         }
237
238         for _, remote := range w.remotes {
239                 remote.NotifySubmission(tx)
240         }
241         return nil
242 }