OSDN Git Service

dododo
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "time"
6
7         btmTypes "github.com/bytom/protocol/bc/types"
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/errors"
12         "github.com/vapor/federation/common"
13         "github.com/vapor/federation/config"
14         "github.com/vapor/federation/database/orm"
15         "github.com/vapor/federation/service"
16         vaporTypes "github.com/vapor/protocol/bc/types"
17 )
18
19 type warder struct {
20         colletInterval time.Duration
21         db             *gorm.DB
22         txCh           chan *orm.CrossTransaction
23         mainchainNode  *service.Node
24         sidechainNode  *service.Node
25         others         []*service.Warder
26 }
27
28 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
29         return &warder{
30                 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
31                 db:             db,
32                 txCh:           txCh,
33                 mainchainNode:  service.NewNode(cfg.Mainchain.Upstream),
34                 sidechainNode:  service.NewNode(cfg.Sidechain.Upstream),
35                 others:         parseOtherWarders(cfg),
36         }
37 }
38
39 func parseOtherWarders(cfg *config.Config) []*service.Warder {
40         var others []*service.Warder
41         for _, warderCfg := range cfg.Warders {
42                 if !warderCfg.IsLocal {
43                         anotherWarder := service.NewWarder(&warderCfg)
44                         others = append(others, anotherWarder)
45                 }
46         }
47         return others
48 }
49
50 func (w *warder) Run() {
51         go w.collectPendingTx()
52         go w.processCrossTxRoutine()
53 }
54
55 func (w *warder) collectPendingTx() {
56         ticker := time.NewTicker(w.colletInterval)
57         for ; true; <-ticker.C {
58                 txs := []*orm.CrossTransaction{}
59                 if err := w.db.Preload("Chain").Preload("Reqs").
60                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly
61                         // otherwise the field "status" is ignored
62                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
63                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
64                         continue
65                 } else if err != nil {
66                         log.Warnln("collectPendingTx", err)
67                 }
68
69                 for _, tx := range txs {
70                         w.txCh <- tx
71                 }
72         }
73 }
74
75 func (w *warder) processCrossTxRoutine() {
76         for ormTx := range w.txCh {
77                 if err := w.validateCrossTx(ormTx); err != nil {
78                         log.Warnln("invalid cross-chain tx", ormTx)
79                         continue
80                 }
81
82                 destTx, destTxID, err := w.proposeDestTx(ormTx)
83                 if err != nil {
84                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("proposeDestTx")
85                         continue
86                 }
87
88                 if err := w.signDestTx(destTx, ormTx); err != nil {
89                         log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("signDestTx")
90                         continue
91                 }
92
93                 // TODO: elect signer & request sign
94
95                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
96                         submittedTxID, err := w.submitTx(destTx)
97                         if err != nil {
98                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "dest tx": destTx}).Warnln("submitTx")
99                                 continue
100                         }
101
102                         if submittedTxID != destTxID {
103                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx, "built tx ID": destTxID, "submittedTx ID": submittedTxID}).Warnln("submitTx ID mismatch")
104                                 continue
105
106                         }
107
108                         // TODO: what to update? what about others?
109                         if err := w.updateSubmission(ormTx); err != nil {
110                                 log.WithFields(log.Fields{"err": err, "cross-chain tx": ormTx}).Warnln("updateSubmission")
111                                 continue
112                         }
113                 }
114         }
115 }
116
117 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
118         switch tx.Status {
119         case common.CrossTxRejectedStatus:
120                 return errors.New("cross-chain tx rejected")
121         case common.CrossTxSubmittedStatus:
122                 return errors.New("cross-chain tx submitted")
123         case common.CrossTxCompletedStatus:
124                 return errors.New("cross-chain tx completed")
125         }
126
127         crossTxReqs := []*orm.CrossTransactionReq{}
128         if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
129                 return err
130         }
131
132         if len(crossTxReqs) != len(tx.Reqs) {
133                 return errors.New("cross-chain requests count mismatch")
134         }
135
136         return nil
137 }
138
139 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
140         switch tx.Chain.Name {
141         case "bytom":
142                 return w.buildSidechainTx(tx)
143         case "vapor":
144                 return w.buildMainchainTx(tx)
145         default:
146                 return nil, "", errors.New("unknown source chain")
147         }
148 }
149
150 // TODO: build it
151 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (*vaporTypes.Tx, string, error) {
152         sidechainTx := &vaporTypes.Tx{}
153
154         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
155                 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
156         }).Error; err != nil {
157                 return nil, "", err
158         }
159
160         return sidechainTx, sidechainTx.ID.String(), nil
161 }
162
163 // TODO: build it
164 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (*btmTypes.Tx, string, error) {
165         mainchainTx := &btmTypes.Tx{}
166
167         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
168                 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
169         }).Error; err != nil {
170                 return nil, "", err
171         }
172
173         return mainchainTx, mainchainTx.ID.String(), nil
174 }
175
176 // TODO: sign it
177 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
178         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
179                 return errors.New("cross-chain tx status error")
180         }
181
182         return nil
183 }
184
185 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
186         return false
187 }
188
189 func (w *warder) isLeader() bool {
190         return false
191 }
192
193 func (w *warder) submitTx(destTx interface{}) (string, error) {
194         switch tx := destTx.(type) {
195         case *btmTypes.Tx:
196                 return w.mainchainNode.SubmitTx(tx)
197         case *vaporTypes.Tx:
198                 return w.sidechainNode.SubmitTx(tx)
199         default:
200                 return "", errors.New("unknown destTx type")
201         }
202 }
203
204 // TODO:
205 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
206         return nil
207 }