OSDN Git Service

clean
[bytom/vapor.git] / federation / warder.go
1 package federation
2
3 import (
4         "database/sql"
5         "time"
6
7         btmTypes "github.com/bytom/protocol/bc/types"
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/errors"
12         "github.com/vapor/federation/common"
13         "github.com/vapor/federation/config"
14         "github.com/vapor/federation/database/orm"
15         "github.com/vapor/federation/service"
16         vaporTypes "github.com/vapor/protocol/bc/types"
17 )
18
19 type warder struct {
20         colletInterval time.Duration
21         db             *gorm.DB
22         txCh           chan *orm.CrossTransaction
23         mainchainNode  *service.Node
24         sidechainNode  *service.Node
25 }
26
27 func NewWarder(cfg *config.Config, db *gorm.DB, txCh chan *orm.CrossTransaction) *warder {
28         return &warder{
29                 colletInterval: time.Duration(cfg.CollectMinutes) * time.Minute,
30                 db:             db,
31                 txCh:           txCh,
32                 mainchainNode:  service.NewNode(cfg.Mainchain.Upstream),
33                 sidechainNode:  service.NewNode(cfg.Sidechain.Upstream),
34         }
35 }
36
37 func (w *warder) Run() {
38         go w.collectPendingTx()
39
40         for ormTx := range w.txCh {
41                 if err := w.validateCrossTx(ormTx); err != nil {
42                         log.Warnln("invalid cross-chain tx", ormTx)
43                         continue
44                 }
45
46                 destTx, destTxID, err := w.proposeDestTx(ormTx)
47                 if err != nil {
48                         log.WithFields(log.Fields{
49                                 "err":            err,
50                                 "cross-chain tx": ormTx,
51                         }).Warnln("proposeDestTx")
52                         continue
53                 }
54
55                 if err := w.signDestTx(destTx, ormTx); err != nil {
56                         log.WithFields(log.Fields{
57                                 "err":            err,
58                                 "cross-chain tx": ormTx,
59                         }).Warnln("signDestTx")
60                         continue
61                 }
62
63                 // TODO: elect signer & request sign
64
65                 // TODO: what if submit fail
66                 if w.isTxSignsReachQuorum(destTx) && w.isLeader() {
67                         submittedTxID, err := w.submitTx(destTx)
68                         if err != nil {
69                                 log.WithFields(log.Fields{
70                                         "err":            err,
71                                         "cross-chain tx": ormTx,
72                                         "dest tx":        destTx,
73                                 }).Warnln("submitTx")
74                                 continue
75                         }
76
77                         if submittedTxID != destTxID {
78                                 log.WithFields(log.Fields{
79                                         "err":            err,
80                                         "cross-chain tx": ormTx,
81                                         "built tx ID":    destTxID,
82                                         "submittedTx ID": submittedTxID,
83                                 }).Warnln("submitTx ID mismatch")
84                                 continue
85
86                         }
87
88                         // TODO: what to update? what about others?
89                         if err := w.updateSubmission(ormTx); err != nil {
90                                 log.WithFields(log.Fields{
91                                         "err":            err,
92                                         "cross-chain tx": ormTx,
93                                 }).Warnln("updateSubmission")
94                                 continue
95                         }
96                 }
97         }
98 }
99
100 func (w *warder) collectPendingTx() {
101         ticker := time.NewTicker(w.colletInterval)
102         for ; true; <-ticker.C {
103                 txs := []*orm.CrossTransaction{}
104                 if err := w.db.Preload("Chain").Preload("Reqs").
105                         // do not use "Where(&orm.CrossTransaction{Status: common.CrossTxPendingStatus})" directly
106                         // otherwise the field "status" is ignored
107                         Model(&orm.CrossTransaction{}).Where("status = ?", common.CrossTxPendingStatus).
108                         Find(&txs).Error; err == gorm.ErrRecordNotFound {
109                         continue
110                 } else if err != nil {
111                         log.Warnln("collectPendingTx", err)
112                 }
113
114                 for _, tx := range txs {
115                         w.txCh <- tx
116                 }
117         }
118 }
119
120 func (w *warder) validateCrossTx(tx *orm.CrossTransaction) error {
121         if tx.Status == common.CrossTxRejectedStatus {
122                 return errors.New("cross-chain tx rejeted")
123         }
124
125         if tx.Status == common.CrossTxRejectedStatus {
126                 return errors.New("cross-chain tx submitted")
127         }
128
129         crossTxReqs := []*orm.CrossTransactionReq{}
130         if err := w.db.Where(&orm.CrossTransactionReq{CrossTransactionID: tx.ID}).Find(&crossTxReqs).Error; err != nil {
131                 return err
132         }
133
134         if len(crossTxReqs) != len(tx.Reqs) {
135                 return errors.New("cross-chain requests count mismatch")
136         }
137
138         return nil
139 }
140
141 func (w *warder) proposeDestTx(tx *orm.CrossTransaction) (interface{}, string, error) {
142         switch tx.Chain.Name {
143         case "bytom":
144                 return w.buildSidechainTx(tx)
145         case "vapor":
146                 return w.buildMainchainTx(tx)
147         default:
148                 return nil, "", errors.New("unknown source chain")
149         }
150 }
151
152 // TODO: build it
153 func (w *warder) buildSidechainTx(tx *orm.CrossTransaction) (interface{}, string, error) {
154         sidechainTx := &vaporTypes.Tx{}
155
156         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
157                 DestTxHash: sql.NullString{sidechainTx.ID.String(), true},
158         }).Error; err != nil {
159                 return nil, "", err
160         }
161
162         return sidechainTx, sidechainTx.ID.String(), nil
163 }
164
165 // TODO: build it
166 func (w *warder) buildMainchainTx(tx *orm.CrossTransaction) (interface{}, string, error) {
167         mainchainTx := &btmTypes.Tx{}
168
169         if err := w.db.Where(tx).UpdateColumn(&orm.CrossTransaction{
170                 DestTxHash: sql.NullString{mainchainTx.ID.String(), true},
171         }).Error; err != nil {
172                 return nil, "", err
173         }
174
175         return mainchainTx, mainchainTx.ID.String(), nil
176 }
177
178 // TODO: sign it
179 func (w *warder) signDestTx(destTx interface{}, tx *orm.CrossTransaction) error {
180         if tx.Status != common.CrossTxPendingStatus || !tx.DestTxHash.Valid {
181                 return errors.New("cross-chain tx status error")
182         }
183
184         return nil
185 }
186
187 // TODO:
188 func (w *warder) isTxSignsReachQuorum(destTx interface{}) bool {
189         return false
190 }
191
192 // TODO:
193 func (w *warder) isLeader() bool {
194         return false
195 }
196
197 func (w *warder) submitTx(destTx interface{}) (string, error) {
198         switch tx := destTx.(type) {
199         case *btmTypes.Tx:
200                 return w.mainchainNode.SubmitTx(tx)
201
202         case *vaporTypes.Tx:
203                 return w.sidechainNode.SubmitTx(tx)
204
205         default:
206                 return "", errors.New("unknown destTx type")
207         }
208 }
209
210 // TODO:
211 func (w *warder) updateSubmission(tx *orm.CrossTransaction) error {
212         return nil
213 }