9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/blockchain/query"
12 dbm "github.com/vapor/database/leveldb"
13 "github.com/vapor/errors"
14 "github.com/vapor/protocol"
15 "github.com/vapor/protocol/bc"
16 "github.com/vapor/protocol/bc/types"
17 "github.com/vapor/protocol/vm/vmutil"
21 //FilterNumMax max txfeed filter amount.
27 //ErrDuplicateAlias means error of duplicate feed alias.
28 ErrDuplicateAlias = errors.New("duplicate feed alias")
29 //ErrEmptyAlias means error of empty feed alias.
30 ErrEmptyAlias = errors.New("empty feed alias")
31 //ErrNumExceedlimit means txfeed filter number exceeds the limit.
32 ErrNumExceedlimit = errors.New("txfeed exceed limit")
33 maxNewTxfeedChSize = 1000
36 //Tracker filter tracker object.
41 txfeedCh chan *types.Tx
44 type rawOutput struct {
55 //TxFeed describe a filter
57 ID string `json:"id,omitempty"`
58 Alias string `json:"alias"`
59 Filter string `json:"filter,omitempty"`
60 Param filter `json:"param,omitempty"`
64 AssetID string `json:"assetid,omitempty"`
65 AmountLowerLimit uint64 `json:"lowerlimit,omitempty"`
66 AmountUpperLimit uint64 `json:"upperlimit,omitempty"`
67 TransType string `json:"transtype,omitempty"`
70 //NewTracker create new txfeed tracker.
71 func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
74 TxFeeds: make([]*TxFeed, 0, 10),
76 txfeedCh: make(chan *types.Tx, maxNewTxfeedChSize),
82 func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
88 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
91 filter, err := parseFilter(txFeed.Filter)
96 txFeeds = append(txFeeds, txFeed)
101 func parseFilter(ft string) (filter, error) {
104 subFilter := strings.Split(ft, "AND")
105 for _, value := range subFilter {
106 param := getParam(value, "=")
110 if strings.Contains(value, "asset_id") {
113 if strings.Contains(value, "amount_lower_limit") {
114 tmp, _ := strconv.ParseInt(param, 10, 64)
115 res.AmountLowerLimit = uint64(tmp)
117 if strings.Contains(value, "amount_upper_limit") {
118 tmp, _ := strconv.ParseInt(param, 10, 64)
119 res.AmountUpperLimit = uint64(tmp)
121 if strings.Contains(value, "trans_type") {
122 res.TransType = param
129 func getParam(str, substr string) string {
130 if result := strings.Index(str, substr); result >= 0 {
131 str := strings.Replace(str[result+1:], "'", "", -1)
132 str = strings.Replace(str, " ", "", -1)
138 func parseTxfeed(db dbm.DB, filters []filter) error {
142 iter := db.Iterator()
147 if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
151 subFilter := strings.Split(txFeed.Filter, "AND")
152 for _, value := range subFilter {
153 param := getParam(value, "=")
157 if strings.Contains(value, "asset_id") {
158 filters[index].AssetID = param
160 if strings.Contains(value, "amount_lower_limit") {
161 tmp, _ := strconv.ParseInt(param, 10, 64)
162 filters[index].AmountLowerLimit = uint64(tmp)
164 if strings.Contains(value, "amount_upper_limit") {
165 tmp, _ := strconv.ParseInt(param, 10, 64)
166 filters[index].AmountUpperLimit = uint64(tmp)
168 if strings.Contains(value, "trans_type") {
169 filters[index].TransType = param
177 //Prepare load and parse filters.
178 func (t *Tracker) Prepare(ctx context.Context) error {
180 t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
184 //GetTxfeedCh return a txfeed channel.
185 func (t *Tracker) GetTxfeedCh() chan *types.Tx {
189 //Create create a txfeed filter.
190 func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
191 // Validate the filter.
193 if err := query.ValidateTransactionFilter(fil); err != nil {
198 return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
201 if len(t.TxFeeds) >= FilterNumMax {
202 return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
205 for _, txfeed := range t.TxFeeds {
206 if txfeed.Alias == alias {
207 return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
216 filter, err := parseFilter(feed.Filter)
221 t.TxFeeds = append(t.TxFeeds, feed)
222 return insertTxFeed(t.DB, feed)
225 func deleteTxFeed(db dbm.DB, alias string) error {
226 key, err := json.Marshal(alias)
234 // insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
235 // and there already exists a txfeed with that client token, insertTxFeed will
236 // lookup and return the existing txfeed instead.
237 func insertTxFeed(db dbm.DB, feed *TxFeed) error {
239 key, err := json.Marshal(feed.Alias)
243 value, err := json.Marshal(feed)
252 //Get get txfeed filter with alias.
253 func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
255 return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
258 for i, v := range t.TxFeeds {
259 if v.Alias == alias {
260 return t.TxFeeds[i], nil
266 //Delete delete txfeed with alias.
267 func (t *Tracker) Delete(ctx context.Context, alias string) error {
268 log.WithFields(log.Fields{"module": logModule, "delete": alias}).Info("delete txfeed")
271 return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
274 for i, txfeed := range t.TxFeeds {
275 if txfeed.Alias == alias {
276 t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
277 return deleteTxFeed(t.DB, alias)
283 func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
284 assetidstr := value.AssetID.String()
286 if txfeed.Param.AssetID != assetidstr && txfeed.Param.AssetID != "" {
289 if txfeed.Param.TransType != value.Type && txfeed.Param.TransType != "" {
292 if txfeed.Param.AmountLowerLimit > value.Amount && txfeed.Param.AmountLowerLimit != 0 {
295 if txfeed.Param.AmountUpperLimit < value.Amount && txfeed.Param.AmountUpperLimit != 0 {
302 //TxFilter filter tx from mempool.
303 func (t *Tracker) TxFilter(tx *types.Tx) error {
304 var annotatedTx *query.AnnotatedTx
305 // Build the fully annotated transaction.
306 annotatedTx = buildAnnotatedTransaction(tx)
307 for _, output := range annotatedTx.Outputs {
308 for _, filter := range t.TxFeeds {
309 if match := outputFilter(filter, output); !match {
312 b, err := json.Marshal(annotatedTx)
316 log.WithFields(log.Fields{"module:": logModule, "filter": string(b)}).Info("find new tx match filter")
323 var emptyJSONObject = json.RawMessage(`{}`)
325 func buildAnnotatedTransaction(orig *types.Tx) *query.AnnotatedTx {
326 tx := &query.AnnotatedTx{
328 Inputs: make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
329 Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
332 for i := range orig.Inputs {
333 tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
335 for i := range orig.Outputs {
336 tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
341 func buildAnnotatedInput(tx *types.Tx, i uint32) *query.AnnotatedInput {
343 in := &query.AnnotatedInput{
344 AssetID: orig.AssetID(),
345 Amount: orig.Amount(),
346 AssetDefinition: &emptyJSONObject,
349 id := tx.Tx.InputIDs[i]
351 switch e := e.(type) {
354 in.ControlProgram = orig.ControlProgram()
355 in.SpentOutputID = e.SpentOutputId
358 in.IssuanceProgram = orig.IssuanceProgram()
364 func buildAnnotatedOutput(tx *types.Tx, idx int) *query.AnnotatedOutput {
365 orig := tx.Outputs[idx]
366 outid := tx.OutputID(idx)
367 out := &query.AnnotatedOutput{
370 AssetID: *orig.AssetId,
371 AssetDefinition: &emptyJSONObject,
373 ControlProgram: orig.ControlProgram,
376 if vmutil.IsUnspendable(out.ControlProgram) {