OSDN Git Service

new repo
[bytom/vapor.git] / blockchain / txfeed / txfeed.go
1 package txfeed
2
3 import (
4         "context"
5         "encoding/json"
6         "strconv"
7         "strings"
8
9         log "github.com/sirupsen/logrus"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "github.com/vapor/blockchain/query"
13         "github.com/vapor/errors"
14         "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17         "github.com/vapor/protocol/vm/vmutil"
18 )
19
20 const (
21         //FilterNumMax max txfeed filter amount.
22         FilterNumMax = 1024
23 )
24
25 var (
26         //ErrDuplicateAlias means error of duplicate feed alias.
27         ErrDuplicateAlias = errors.New("duplicate feed alias")
28         //ErrEmptyAlias means error of empty feed alias.
29         ErrEmptyAlias = errors.New("empty feed alias")
30         //ErrNumExceedlimit means txfeed filter number exceeds the limit.
31         ErrNumExceedlimit  = errors.New("txfeed exceed limit")
32         maxNewTxfeedChSize = 1000
33 )
34
35 //Tracker filter tracker object.
36 type Tracker struct {
37         DB       dbm.DB
38         TxFeeds  []*TxFeed
39         chain    *protocol.Chain
40         txfeedCh chan *types.Tx
41 }
42
43 type rawOutput struct {
44         OutputID bc.Hash
45         bc.AssetAmount
46         ControlProgram []byte
47         txHash         bc.Hash
48         outputIndex    uint32
49         sourceID       bc.Hash
50         sourcePos      uint64
51         refData        bc.Hash
52 }
53
54 //TxFeed describe a filter
55 type TxFeed struct {
56         ID     string `json:"id,omitempty"`
57         Alias  string `json:"alias"`
58         Filter string `json:"filter,omitempty"`
59         Param  filter `json:"param,omitempty"`
60 }
61
62 type filter struct {
63         AssetID          string `json:"assetid,omitempty"`
64         AmountLowerLimit uint64 `json:"lowerlimit,omitempty"`
65         AmountUpperLimit uint64 `json:"upperlimit,omitempty"`
66         TransType        string `json:"transtype,omitempty"`
67 }
68
69 //NewTracker create new txfeed tracker.
70 func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
71         s := &Tracker{
72                 DB:       db,
73                 TxFeeds:  make([]*TxFeed, 0, 10),
74                 chain:    chain,
75                 txfeedCh: make(chan *types.Tx, maxNewTxfeedChSize),
76         }
77
78         return s
79 }
80
81 func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
82         iter := db.Iterator()
83         defer iter.Release()
84
85         for iter.Next() {
86                 txFeed := &TxFeed{}
87                 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
88                         return nil, err
89                 }
90                 filter, err := parseFilter(txFeed.Filter)
91                 if err != nil {
92                         return nil, err
93                 }
94                 txFeed.Param = filter
95                 txFeeds = append(txFeeds, txFeed)
96         }
97         return txFeeds, nil
98 }
99
100 func parseFilter(ft string) (filter, error) {
101         var res filter
102
103         subFilter := strings.Split(ft, "AND")
104         for _, value := range subFilter {
105                 param := getParam(value, "=")
106                 if param == "" {
107                         continue
108                 }
109                 if strings.Contains(value, "asset_id") {
110                         res.AssetID = param
111                 }
112                 if strings.Contains(value, "amount_lower_limit") {
113                         tmp, _ := strconv.ParseInt(param, 10, 64)
114                         res.AmountLowerLimit = uint64(tmp)
115                 }
116                 if strings.Contains(value, "amount_upper_limit") {
117                         tmp, _ := strconv.ParseInt(param, 10, 64)
118                         res.AmountUpperLimit = uint64(tmp)
119                 }
120                 if strings.Contains(value, "trans_type") {
121                         res.TransType = param
122                 }
123         }
124         return res, nil
125 }
126
127 //TODO
128 func getParam(str, substr string) string {
129         if result := strings.Index(str, substr); result >= 0 {
130                 str := strings.Replace(str[result+1:], "'", "", -1)
131                 str = strings.Replace(str, " ", "", -1)
132                 return str
133         }
134         return ""
135 }
136
137 func parseTxfeed(db dbm.DB, filters []filter) error {
138         var txFeed TxFeed
139         var index int
140
141         iter := db.Iterator()
142         defer iter.Release()
143
144         for iter.Next() {
145
146                 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
147                         return err
148                 }
149
150                 subFilter := strings.Split(txFeed.Filter, "AND")
151                 for _, value := range subFilter {
152                         param := getParam(value, "=")
153                         if param == "" {
154                                 continue
155                         }
156                         if strings.Contains(value, "asset_id") {
157                                 filters[index].AssetID = param
158                         }
159                         if strings.Contains(value, "amount_lower_limit") {
160                                 tmp, _ := strconv.ParseInt(param, 10, 64)
161                                 filters[index].AmountLowerLimit = uint64(tmp)
162                         }
163                         if strings.Contains(value, "amount_upper_limit") {
164                                 tmp, _ := strconv.ParseInt(param, 10, 64)
165                                 filters[index].AmountUpperLimit = uint64(tmp)
166                         }
167                         if strings.Contains(value, "trans_type") {
168                                 filters[index].TransType = param
169                         }
170                 }
171                 index++
172         }
173         return nil
174 }
175
176 //Prepare load and parse filters.
177 func (t *Tracker) Prepare(ctx context.Context) error {
178         var err error
179         t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
180         return err
181 }
182
183 //GetTxfeedCh return a txfeed channel.
184 func (t *Tracker) GetTxfeedCh() chan *types.Tx {
185         return t.txfeedCh
186 }
187
188 //Create create a txfeed filter.
189 func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
190         // Validate the filter.
191
192         if err := query.ValidateTransactionFilter(fil); err != nil {
193                 return err
194         }
195
196         if alias == "" {
197                 return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
198         }
199
200         if len(t.TxFeeds) >= FilterNumMax {
201                 return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
202         }
203
204         for _, txfeed := range t.TxFeeds {
205                 if txfeed.Alias == alias {
206                         return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
207                 }
208         }
209
210         feed := &TxFeed{
211                 Alias:  alias,
212                 Filter: fil,
213         }
214
215         filter, err := parseFilter(feed.Filter)
216         if err != nil {
217                 return err
218         }
219         feed.Param = filter
220         t.TxFeeds = append(t.TxFeeds, feed)
221         return insertTxFeed(t.DB, feed)
222 }
223
224 func deleteTxFeed(db dbm.DB, alias string) error {
225         key, err := json.Marshal(alias)
226         if err != nil {
227                 return err
228         }
229         db.Delete(key)
230         return nil
231 }
232
233 // insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
234 // and there already exists a txfeed with that client token, insertTxFeed will
235 // lookup and return the existing txfeed instead.
236 func insertTxFeed(db dbm.DB, feed *TxFeed) error {
237         // var err error
238         key, err := json.Marshal(feed.Alias)
239         if err != nil {
240                 return err
241         }
242         value, err := json.Marshal(feed)
243         if err != nil {
244                 return err
245         }
246
247         db.Set(key, value)
248         return nil
249 }
250
251 //Get get txfeed filter with alias.
252 func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
253         if alias == "" {
254                 return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
255         }
256
257         for i, v := range t.TxFeeds {
258                 if v.Alias == alias {
259                         return t.TxFeeds[i], nil
260                 }
261         }
262         return nil, nil
263 }
264
265 //Delete delete txfeed with alias.
266 func (t *Tracker) Delete(ctx context.Context, alias string) error {
267         log.WithField("delete", alias).Info("delete txfeed")
268
269         if alias == "" {
270                 return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
271         }
272
273         for i, txfeed := range t.TxFeeds {
274                 if txfeed.Alias == alias {
275                         t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
276                         return deleteTxFeed(t.DB, alias)
277                 }
278         }
279         return nil
280 }
281
282 func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
283         assetidstr := value.AssetID.String()
284
285         if txfeed.Param.AssetID != assetidstr && txfeed.Param.AssetID != "" {
286                 return false
287         }
288         if txfeed.Param.TransType != value.Type && txfeed.Param.TransType != "" {
289                 return false
290         }
291         if txfeed.Param.AmountLowerLimit > value.Amount && txfeed.Param.AmountLowerLimit != 0 {
292                 return false
293         }
294         if txfeed.Param.AmountUpperLimit < value.Amount && txfeed.Param.AmountUpperLimit != 0 {
295                 return false
296         }
297
298         return true
299 }
300
301 //TxFilter filter tx from mempool.
302 func (t *Tracker) TxFilter(tx *types.Tx) error {
303         var annotatedTx *query.AnnotatedTx
304         // Build the fully annotated transaction.
305         annotatedTx = buildAnnotatedTransaction(tx)
306         for _, output := range annotatedTx.Outputs {
307                 for _, filter := range t.TxFeeds {
308                         if match := outputFilter(filter, output); !match {
309                                 continue
310                         }
311                         b, err := json.Marshal(annotatedTx)
312                         if err != nil {
313                                 return err
314                         }
315                         log.WithField("filter", string(b)).Info("find new tx match filter")
316                         t.txfeedCh <- tx
317                 }
318         }
319         return nil
320 }
321
322 var emptyJSONObject = json.RawMessage(`{}`)
323
324 func buildAnnotatedTransaction(orig *types.Tx) *query.AnnotatedTx {
325         tx := &query.AnnotatedTx{
326                 ID:      orig.ID,
327                 Inputs:  make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
328                 Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
329         }
330
331         for i := range orig.Inputs {
332                 tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
333         }
334         for i := range orig.Outputs {
335                 tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
336         }
337         return tx
338 }
339
340 func buildAnnotatedInput(tx *types.Tx, i uint32) *query.AnnotatedInput {
341         orig := tx.Inputs[i]
342         in := &query.AnnotatedInput{
343                 AssetID:         orig.AssetID(),
344                 Amount:          orig.Amount(),
345                 AssetDefinition: &emptyJSONObject,
346         }
347
348         id := tx.Tx.InputIDs[i]
349         e := tx.Entries[id]
350         switch e := e.(type) {
351         case *bc.Spend:
352                 in.Type = "spend"
353                 in.ControlProgram = orig.ControlProgram()
354                 in.SpentOutputID = e.SpentOutputId
355         case *bc.Issuance:
356                 in.Type = "issue"
357                 in.IssuanceProgram = orig.IssuanceProgram()
358         }
359
360         return in
361 }
362
363 func buildAnnotatedOutput(tx *types.Tx, idx int) *query.AnnotatedOutput {
364         orig := tx.Outputs[idx]
365         outid := tx.OutputID(idx)
366         out := &query.AnnotatedOutput{
367                 OutputID:        *outid,
368                 Position:        idx,
369                 AssetID:         *orig.AssetId,
370                 AssetDefinition: &emptyJSONObject,
371                 Amount:          orig.Amount,
372                 ControlProgram:  orig.ControlProgram,
373         }
374
375         if vmutil.IsUnspendable(out.ControlProgram) {
376                 out.Type = "retire"
377         } else {
378                 out.Type = "control"
379         }
380         return out
381 }