OSDN Git Service

feat: add build crosschain input (#91)
[bytom/vapor.git] / blockchain / txfeed / txfeed.go
1 package txfeed
2
3 import (
4         "context"
5         "encoding/json"
6         "strconv"
7         "strings"
8
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/blockchain/query"
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/errors"
14         "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17         "github.com/vapor/protocol/vm/vmutil"
18 )
19
20 const (
21         //FilterNumMax max txfeed filter amount.
22         FilterNumMax = 1024
23         logModule    = "txfeed"
24 )
25
26 var (
27         //ErrDuplicateAlias means error of duplicate feed alias.
28         ErrDuplicateAlias = errors.New("duplicate feed alias")
29         //ErrEmptyAlias means error of empty feed alias.
30         ErrEmptyAlias = errors.New("empty feed alias")
31         //ErrNumExceedlimit means txfeed filter number exceeds the limit.
32         ErrNumExceedlimit  = errors.New("txfeed exceed limit")
33         maxNewTxfeedChSize = 1000
34 )
35
36 //Tracker filter tracker object.
37 type Tracker struct {
38         DB       dbm.DB
39         TxFeeds  []*TxFeed
40         chain    *protocol.Chain
41         txfeedCh chan *types.Tx
42 }
43
44 type rawOutput struct {
45         OutputID bc.Hash
46         bc.AssetAmount
47         ControlProgram []byte
48         txHash         bc.Hash
49         outputIndex    uint32
50         sourceID       bc.Hash
51         sourcePos      uint64
52         refData        bc.Hash
53 }
54
55 //TxFeed describe a filter
56 type TxFeed struct {
57         ID     string `json:"id,omitempty"`
58         Alias  string `json:"alias"`
59         Filter string `json:"filter,omitempty"`
60         Param  filter `json:"param,omitempty"`
61 }
62
63 type filter struct {
64         AssetID          string `json:"assetid,omitempty"`
65         AmountLowerLimit uint64 `json:"lowerlimit,omitempty"`
66         AmountUpperLimit uint64 `json:"upperlimit,omitempty"`
67         TransType        string `json:"transtype,omitempty"`
68 }
69
70 //NewTracker create new txfeed tracker.
71 func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
72         s := &Tracker{
73                 DB:       db,
74                 TxFeeds:  make([]*TxFeed, 0, 10),
75                 chain:    chain,
76                 txfeedCh: make(chan *types.Tx, maxNewTxfeedChSize),
77         }
78
79         return s
80 }
81
82 func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
83         iter := db.Iterator()
84         defer iter.Release()
85
86         for iter.Next() {
87                 txFeed := &TxFeed{}
88                 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
89                         return nil, err
90                 }
91                 filter, err := parseFilter(txFeed.Filter)
92                 if err != nil {
93                         return nil, err
94                 }
95                 txFeed.Param = filter
96                 txFeeds = append(txFeeds, txFeed)
97         }
98         return txFeeds, nil
99 }
100
101 func parseFilter(ft string) (filter, error) {
102         var res filter
103
104         subFilter := strings.Split(ft, "AND")
105         for _, value := range subFilter {
106                 param := getParam(value, "=")
107                 if param == "" {
108                         continue
109                 }
110                 if strings.Contains(value, "asset_id") {
111                         res.AssetID = param
112                 }
113                 if strings.Contains(value, "amount_lower_limit") {
114                         tmp, _ := strconv.ParseInt(param, 10, 64)
115                         res.AmountLowerLimit = uint64(tmp)
116                 }
117                 if strings.Contains(value, "amount_upper_limit") {
118                         tmp, _ := strconv.ParseInt(param, 10, 64)
119                         res.AmountUpperLimit = uint64(tmp)
120                 }
121                 if strings.Contains(value, "trans_type") {
122                         res.TransType = param
123                 }
124         }
125         return res, nil
126 }
127
128 //TODO
129 func getParam(str, substr string) string {
130         if result := strings.Index(str, substr); result >= 0 {
131                 str := strings.Replace(str[result+1:], "'", "", -1)
132                 str = strings.Replace(str, " ", "", -1)
133                 return str
134         }
135         return ""
136 }
137
138 func parseTxfeed(db dbm.DB, filters []filter) error {
139         var txFeed TxFeed
140         var index int
141
142         iter := db.Iterator()
143         defer iter.Release()
144
145         for iter.Next() {
146
147                 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
148                         return err
149                 }
150
151                 subFilter := strings.Split(txFeed.Filter, "AND")
152                 for _, value := range subFilter {
153                         param := getParam(value, "=")
154                         if param == "" {
155                                 continue
156                         }
157                         if strings.Contains(value, "asset_id") {
158                                 filters[index].AssetID = param
159                         }
160                         if strings.Contains(value, "amount_lower_limit") {
161                                 tmp, _ := strconv.ParseInt(param, 10, 64)
162                                 filters[index].AmountLowerLimit = uint64(tmp)
163                         }
164                         if strings.Contains(value, "amount_upper_limit") {
165                                 tmp, _ := strconv.ParseInt(param, 10, 64)
166                                 filters[index].AmountUpperLimit = uint64(tmp)
167                         }
168                         if strings.Contains(value, "trans_type") {
169                                 filters[index].TransType = param
170                         }
171                 }
172                 index++
173         }
174         return nil
175 }
176
177 //Prepare load and parse filters.
178 func (t *Tracker) Prepare(ctx context.Context) error {
179         var err error
180         t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
181         return err
182 }
183
184 //GetTxfeedCh return a txfeed channel.
185 func (t *Tracker) GetTxfeedCh() chan *types.Tx {
186         return t.txfeedCh
187 }
188
189 //Create create a txfeed filter.
190 func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
191         // Validate the filter.
192
193         if err := query.ValidateTransactionFilter(fil); err != nil {
194                 return err
195         }
196
197         if alias == "" {
198                 return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
199         }
200
201         if len(t.TxFeeds) >= FilterNumMax {
202                 return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
203         }
204
205         for _, txfeed := range t.TxFeeds {
206                 if txfeed.Alias == alias {
207                         return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
208                 }
209         }
210
211         feed := &TxFeed{
212                 Alias:  alias,
213                 Filter: fil,
214         }
215
216         filter, err := parseFilter(feed.Filter)
217         if err != nil {
218                 return err
219         }
220         feed.Param = filter
221         t.TxFeeds = append(t.TxFeeds, feed)
222         return insertTxFeed(t.DB, feed)
223 }
224
225 func deleteTxFeed(db dbm.DB, alias string) error {
226         key, err := json.Marshal(alias)
227         if err != nil {
228                 return err
229         }
230         db.Delete(key)
231         return nil
232 }
233
234 // insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
235 // and there already exists a txfeed with that client token, insertTxFeed will
236 // lookup and return the existing txfeed instead.
237 func insertTxFeed(db dbm.DB, feed *TxFeed) error {
238         // var err error
239         key, err := json.Marshal(feed.Alias)
240         if err != nil {
241                 return err
242         }
243         value, err := json.Marshal(feed)
244         if err != nil {
245                 return err
246         }
247
248         db.Set(key, value)
249         return nil
250 }
251
252 //Get get txfeed filter with alias.
253 func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
254         if alias == "" {
255                 return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
256         }
257
258         for i, v := range t.TxFeeds {
259                 if v.Alias == alias {
260                         return t.TxFeeds[i], nil
261                 }
262         }
263         return nil, nil
264 }
265
266 //Delete delete txfeed with alias.
267 func (t *Tracker) Delete(ctx context.Context, alias string) error {
268         log.WithFields(log.Fields{"module": logModule, "delete": alias}).Info("delete txfeed")
269
270         if alias == "" {
271                 return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
272         }
273
274         for i, txfeed := range t.TxFeeds {
275                 if txfeed.Alias == alias {
276                         t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
277                         return deleteTxFeed(t.DB, alias)
278                 }
279         }
280         return nil
281 }
282
283 func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
284         assetidstr := value.AssetID.String()
285
286         if txfeed.Param.AssetID != assetidstr && txfeed.Param.AssetID != "" {
287                 return false
288         }
289         if txfeed.Param.TransType != value.Type && txfeed.Param.TransType != "" {
290                 return false
291         }
292         if txfeed.Param.AmountLowerLimit > value.Amount && txfeed.Param.AmountLowerLimit != 0 {
293                 return false
294         }
295         if txfeed.Param.AmountUpperLimit < value.Amount && txfeed.Param.AmountUpperLimit != 0 {
296                 return false
297         }
298
299         return true
300 }
301
302 //TxFilter filter tx from mempool.
303 func (t *Tracker) TxFilter(tx *types.Tx) error {
304         var annotatedTx *query.AnnotatedTx
305         // Build the fully annotated transaction.
306         annotatedTx = buildAnnotatedTransaction(tx)
307         for _, output := range annotatedTx.Outputs {
308                 for _, filter := range t.TxFeeds {
309                         if match := outputFilter(filter, output); !match {
310                                 continue
311                         }
312                         b, err := json.Marshal(annotatedTx)
313                         if err != nil {
314                                 return err
315                         }
316                         log.WithFields(log.Fields{"module:": logModule, "filter": string(b)}).Info("find new tx match filter")
317                         t.txfeedCh <- tx
318                 }
319         }
320         return nil
321 }
322
323 var emptyJSONObject = json.RawMessage(`{}`)
324
325 func buildAnnotatedTransaction(orig *types.Tx) *query.AnnotatedTx {
326         tx := &query.AnnotatedTx{
327                 ID:      orig.ID,
328                 Inputs:  make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
329                 Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
330         }
331
332         for i := range orig.Inputs {
333                 tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
334         }
335         for i := range orig.Outputs {
336                 tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
337         }
338         return tx
339 }
340
341 func buildAnnotatedInput(tx *types.Tx, i uint32) *query.AnnotatedInput {
342         orig := tx.Inputs[i]
343         in := &query.AnnotatedInput{
344                 AssetID:         orig.AssetID(),
345                 Amount:          orig.Amount(),
346                 AssetDefinition: &emptyJSONObject,
347         }
348
349         id := tx.Tx.InputIDs[i]
350         e := tx.Entries[id]
351         switch e := e.(type) {
352         case *bc.Spend:
353                 in.Type = "spend"
354                 in.ControlProgram = orig.ControlProgram()
355                 in.SpentOutputID = e.SpentOutputId
356
357         case *bc.CrossChainInput:
358                 in.Type = "cross_chain_in"
359                 in.ControlProgram = orig.ControlProgram()
360                 in.SpentOutputID = e.MainchainOutputId
361         }
362
363         return in
364 }
365
366 func buildAnnotatedOutput(tx *types.Tx, idx int) *query.AnnotatedOutput {
367         orig := tx.Outputs[idx]
368         outid := tx.OutputID(idx)
369         out := &query.AnnotatedOutput{
370                 OutputID:        *outid,
371                 Position:        idx,
372                 AssetID:         *orig.AssetAmount().AssetId,
373                 AssetDefinition: &emptyJSONObject,
374                 Amount:          orig.AssetAmount().Amount,
375                 ControlProgram:  orig.ControlProgram(),
376         }
377
378         if vmutil.IsUnspendable(out.ControlProgram) {
379                 out.Type = "retire"
380         } else {
381                 out.Type = "control"
382         }
383         return out
384 }