OSDN Git Service

Add txfeed function (#103)
authoryahtoo <yahtoo.ma@gmail.com>
Wed, 15 Nov 2017 05:43:23 +0000 (13:43 +0800)
committerPaladz <yzhu101@uottawa.ca>
Wed, 15 Nov 2017 05:43:23 +0000 (13:43 +0800)
* Add txfeed function

* Add txfeed function

* Add txfeed function

* Add txfeed function

* Add txfeed function

* Add txfeed function

blockchain/query.go
blockchain/query/filter/parser.go
blockchain/query/filter/typecheck.go
blockchain/query/transactions.go
blockchain/reactor.go
blockchain/transact.go
blockchain/txfeed/txfeed.go
blockchain/txfeeds.go
cmd/bytomcli/main.go
node/node.go

index ab0d20f..617ff38 100644 (file)
@@ -170,31 +170,6 @@ func (bcr *BlockchainReactor) listTransactions(ctx context.Context, in requestQu
        }, nil
 }
 
-// listTxFeeds is an http handler for listing txfeeds. It does not take a filter.
-//
-// POST /list-transaction-feeds
-func (bcr *BlockchainReactor) listTxFeeds(ctx context.Context, in requestQuery) (page, error) {
-       limit := in.PageSize
-       if limit == 0 {
-               limit = defGenericPageSize
-       }
-
-       after := in.After
-
-       /*      txfeeds, after, err := bcr.txFeeds.Query(ctx, after, limit)
-               if err != nil {
-                       return page{}, errors.Wrap(err, "running txfeed query")
-               }
-       */
-       out := in
-       out.After = after
-       return page{
-               //              Items:    httpjson.Array(txfeeds),
-               //              LastPage: len(txfeeds) < limit,
-               Next: out,
-       }, nil
-}
-
 // POST /list-unspent-outputs
 func (bcr *BlockchainReactor) listUnspentOutputs(ctx context.Context, in requestQuery) interface{} {
 
index 10e2299..8aa9e9c 100644 (file)
@@ -35,7 +35,7 @@ func (p Predicate) MarshalText() ([]byte, error) {
 
 // Parse parses a predicate and returns an internal representation of the
 // predicate or an error if it fails to parse.
-/*func Parse(predicate string, tbl *SQLTable, vals []interface{}) (p Predicate, err error) {
+func Parse(predicate string, tbl *Table, vals []interface{}) (p Predicate, err error) {
        expr, parser, err := parse(predicate)
        if err != nil {
                return p, errors.WithDetail(ErrBadFilter, err.Error())
@@ -51,7 +51,6 @@ func (p Predicate) MarshalText() ([]byte, error) {
                Parameters:    parser.maxPlaceholder,
        }, nil
 }
-*/
 
 // Field is a type for simple expressions that simply access an attribute of
 // the queried object. They're used for GROUP BYs.
index a943fbb..e9c4678 100644 (file)
@@ -1,11 +1,32 @@
 package filter
 
 import (
-       //"errors"
        "fmt"
-       //"strings"
+
+       "github.com/bytom/errors"
 )
 
+//Column describe a column
+type Column struct {
+       Name string
+       Type Type
+}
+
+//Table describe a table
+type Table struct {
+       Name        string
+       Alias       string
+       Columns     map[string]*Column
+       ForeignKeys map[string]*ForeignKey
+}
+
+//ForeignKey describe a foreign key
+type ForeignKey struct {
+       Table         *Table
+       LocalColumn   string
+       ForeignColumn string
+}
+
 func isType(got Type, want Type) bool {
        return got == want || got == Any
 }
@@ -34,7 +55,7 @@ func valueTypes(vals []interface{}) ([]Type, error) {
 // typeCheck will statically type check expr with vals as the parameters
 // and using tbl to determine available attributes and environments. It
 // returns the inferred types of arbitrary json keys as a map.
-/*func typeCheck(expr expr, tbl *SQLTable, vals []interface{}) (map[string]Type, error) {
+func typeCheck(expr expr, tbl *Table, vals []interface{}) (map[string]Type, error) {
        valTypes, err := valueTypes(vals)
        if err != nil {
                return nil, err
@@ -54,7 +75,7 @@ func valueTypes(vals []interface{}) ([]Type, error) {
        return selectorTypes, nil
 }
 
-func typeCheckExpr(expr expr, tbl *SQLTable, valTypes []Type, selectorTypes map[string]Type) (typ Type, err error) {
+func typeCheckExpr(expr expr, tbl *Table, valTypes []Type, selectorTypes map[string]Type) (typ Type, err error) {
        if expr == nil { // no expr is a valid, bool type
                return Bool, nil
        }
@@ -182,7 +203,6 @@ func typeCheckExpr(expr expr, tbl *SQLTable, valTypes []Type, selectorTypes map[
                panic(fmt.Errorf("unrecognized expr type %T", expr))
        }
 }
-*/
 
 func assertType(expr expr, got, want Type, selectorTypes map[string]Type) (bool, error) {
        if !isType(got, want) { // type does not match
index 7453532..28b30f4 100644 (file)
@@ -1,22 +1,32 @@
 package query
 
 import (
-       //      "bytes"
-       //      "context"
-       //      "encoding/json"
        "fmt"
        "math"
-       //      "strconv"
 
-       //      "github.com/blockchain/blockchain/query/filter"
+       "github.com/bytom/blockchain/query/filter"
        "github.com/bytom/errors"
 )
 
+var filterTable = filter.Table{
+       Name:  "annotated_txs",
+       Alias: "txs",
+       Columns: map[string]*filter.Column{
+               "asset_id":           {Name: "assetid", Type: filter.String},
+               "amount_lower_limit": {Name: "amountlower", Type: filter.Integer},
+               "amount_upper_limit": {Name: "amountupper", Type: filter.Integer},
+               "trans_type":         {Name: "transtype", Type: filter.String},
+       },
+}
+
 var (
-       ErrBadAfter               = errors.New("malformed pagination parameter after")
+       //ErrBadAfter means malformed pagination parameter.
+       ErrBadAfter = errors.New("malformed pagination parameter after")
+       //ErrParameterCountMismatch means wrong number of parameters to query.
        ErrParameterCountMismatch = errors.New("wrong number of parameters to query")
 )
 
+//TxAfter means the last query block by a list-transactions query.
 type TxAfter struct {
        // FromBlockHeight and FromPosition uniquely identify the last transaction returned
        // by a list-transactions query.
@@ -36,6 +46,7 @@ func (after TxAfter) String() string {
        return fmt.Sprintf("%d:%d-%d", after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
 }
 
+//DecodeTxAfter decode tx from the last block.
 func DecodeTxAfter(str string) (c TxAfter, err error) {
        var from, pos, stop uint64
        _, err = fmt.Sscanf(str, "%d:%d-%d", &from, &pos, &stop)
@@ -50,157 +61,8 @@ func DecodeTxAfter(str string) (c TxAfter, err error) {
        return TxAfter{FromBlockHeight: from, FromPosition: uint32(pos), StopBlockHeight: stop}, nil
 }
 
-/*
+//ValidateTransactionFilter verify txfeed filter validity.
 func ValidateTransactionFilter(filt string) error {
-       _, err := filter.Parse(filt, transactionsTable, nil)
+       _, err := filter.Parse(filt, &filterTable, nil)
        return err
 }
-
-// LookupTxAfter looks up the transaction `after` for the provided time range.
-func (ind *Indexer) LookupTxAfter(ctx context.Context, begin, end uint64) (TxAfter, error) {
-       const q = `
-               SELECT COALESCE(MAX(height), 0), COALESCE(MIN(height), 0) FROM query_blocks
-               WHERE timestamp >= $1 AND timestamp <= $2
-       `
-
-       var from, stop uint64
-       err := ind.db.QueryRowContext(ctx, q, begin, end).Scan(&from, &stop)
-       if err != nil {
-               return TxAfter{}, errors.Wrap(err, "querying `query_blocks`")
-       }
-       return TxAfter{
-               FromBlockHeight: from,
-               FromPosition:    math.MaxInt32, // TODO(tessr): Support reversing direction.
-               StopBlockHeight: stop,
-       }, nil
-}
-
-// Transactions queries the blockchain for transactions matching the
-// filter predicate `filt`.
-func (ind *Indexer) Transactions(ctx context.Context, filt string, vals []interface{}, after TxAfter, limit int, asc bool) ([]*AnnotatedTx, *TxAfter, error) {
-       p, err := filter.Parse(filt, transactionsTable, vals)
-       if err != nil {
-               return nil, nil, err
-       }
-       if len(vals) != p.Parameters {
-               return nil, nil, ErrParameterCountMismatch
-       }
-       expr, err := filter.AsSQL(p, transactionsTable, vals)
-       if err != nil {
-               return nil, nil, errors.Wrap(err, "converting to SQL")
-       }
-
-       queryStr, queryArgs := constructTransactionsQuery(expr, vals, after, asc, limit)
-
-       if asc {
-               return ind.waitForAndFetchTransactions(ctx, queryStr, queryArgs, after, limit)
-       }
-       return ind.fetchTransactions(ctx, queryStr, queryArgs, after, limit)
-}
-
-// If asc is true, the transactions will be returned from "in front" of the `after`
-// param (e.g., the oldest transaction immediately after the `after` param,
-// followed by the second oldest, etc) in ascending order.
-func constructTransactionsQuery(expr string, vals []interface{}, after TxAfter, asc bool, limit int) (string, []interface{}) {
-       var buf bytes.Buffer
-
-       buf.WriteString("SELECT block_height, tx_pos, data FROM annotated_txs AS txs")
-       buf.WriteString(" WHERE ")
-
-       // add filter conditions
-       if len(expr) > 0 {
-               buf.WriteString(expr)
-               buf.WriteString(" AND ")
-       }
-
-       if asc {
-               // add time range & after conditions
-               buf.WriteString(fmt.Sprintf("(txs.block_height, txs.tx_pos) > ($%d, $%d) AND ", len(vals)+1, len(vals)+2))
-               buf.WriteString(fmt.Sprintf("txs.block_height <= $%d ", len(vals)+3))
-               vals = append(vals, after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
-
-               buf.WriteString("ORDER BY txs.block_height ASC, txs.tx_pos ASC ")
-       } else {
-               // add time range & after conditions
-               buf.WriteString(fmt.Sprintf("(txs.block_height, txs.tx_pos) < ($%d, $%d) AND ", len(vals)+1, len(vals)+2))
-               buf.WriteString(fmt.Sprintf("txs.block_height >= $%d ", len(vals)+3))
-               vals = append(vals, after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
-
-               buf.WriteString("ORDER BY txs.block_height DESC, txs.tx_pos DESC ")
-       }
-
-       buf.WriteString("LIMIT " + strconv.Itoa(limit))
-       return buf.String(), vals
-}
-
-func (ind *Indexer) fetchTransactions(ctx context.Context, queryStr string, queryArgs []interface{}, after TxAfter, limit int) ([]*AnnotatedTx, *TxAfter, error) {
-       rows, err := ind.db.QueryContext(ctx, queryStr, queryArgs...)
-       if err != nil {
-               return nil, nil, errors.Wrap(err, "executing txn query")
-       }
-       defer rows.Close()
-
-       txns := make([]*AnnotatedTx, 0, limit)
-       for rows.Next() {
-               var data []byte
-               err := rows.Scan(&after.FromBlockHeight, &after.FromPosition, &data)
-               if err != nil {
-                       return nil, nil, errors.Wrap(err, "scanning transaction row")
-               }
-               tx := new(AnnotatedTx)
-               err = json.Unmarshal(data, tx)
-               if err != nil {
-                       return nil, nil, errors.Wrap(err, "unmarshaling annotated transaction")
-               }
-               txns = append(txns, tx)
-       }
-       err = rows.Err()
-       if err != nil {
-               return nil, nil, errors.Wrap(err)
-       }
-       return txns, &after, nil
-}
-
-type fetchResp struct {
-       txns  []*AnnotatedTx
-       after *TxAfter
-       err   error
-}
-
-func (ind *Indexer) waitForAndFetchTransactions(ctx context.Context, queryStr string, queryArgs []interface{}, after TxAfter, limit int) ([]*AnnotatedTx, *TxAfter, error) {
-       resp := make(chan fetchResp, 1)
-       go func() {
-               var (
-                       txs []*AnnotatedTx
-                       aft *TxAfter
-                       err error
-               )
-
-               for h := ind.c.Height(); len(txs) == 0; h++ {
-                       <-ind.pinStore.PinWaiter(TxPinName, h)
-                       if err != nil {
-                               resp <- fetchResp{nil, nil, err}
-                               return
-                       }
-
-                       txs, aft, err = ind.fetchTransactions(ctx, queryStr, queryArgs, after, limit)
-                       if err != nil {
-                               resp <- fetchResp{nil, nil, err}
-                               return
-                       }
-
-                       if len(txs) > 0 {
-                               resp <- fetchResp{txs, aft, nil}
-                               return
-                       }
-               }
-       }()
-
-       select {
-       case <-ctx.Done():
-               return nil, nil, ctx.Err()
-       case r := <-resp:
-               return r.txns, r.after, r.err
-       }
-}
-*/
index 0970265..db96382 100644 (file)
@@ -42,24 +42,24 @@ const (
        crosscoreRPCPrefix          = "/rpc/"
 )
 
-// BlockchainReactor handles long-term catchup syncing.
+//BlockchainReactor handles long-term catchup syncing.
 type BlockchainReactor struct {
        p2p.BaseReactor
 
-       chain       *protocol.Chain
-       pinStore    *pin.Store
-       accounts    *account.Manager
-       assets      *asset.Registry
-       accesstoken *accesstoken.Token
-       txFeeds     *txfeed.TxFeed
-       blockKeeper *blockKeeper
-       txPool      *protocol.TxPool
-       hsm         *pseudohsm.HSM
-       mining      *cpuminer.CPUMiner
-       mux         *http.ServeMux
-       sw          *p2p.Switch
-       handler     http.Handler
-       evsw        types.EventSwitch
+       chain         *protocol.Chain
+       pinStore      *pin.Store
+       accounts      *account.Manager
+       assets        *asset.Registry
+       accesstoken   *accesstoken.Token
+       txFeedTracker *txfeed.Tracker
+       blockKeeper   *blockKeeper
+       txPool        *protocol.TxPool
+       hsm           *pseudohsm.HSM
+       mining        *cpuminer.CPUMiner
+       mux           *http.ServeMux
+       sw            *p2p.Switch
+       handler       http.Handler
+       evsw          types.EventSwitch
 }
 
 func batchRecover(ctx context.Context, v *interface{}) {
@@ -218,19 +218,20 @@ type page struct {
        LastPage bool         `json:"last_page"`
 }
 
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store) *BlockchainReactor {
+func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store, txfeeds *txfeed.Tracker) *BlockchainReactor {
        mining := cpuminer.NewCPUMiner(chain, accounts, txPool)
        bcR := &BlockchainReactor{
-               chain:       chain,
-               pinStore:    pinStore,
-               accounts:    accounts,
-               assets:      assets,
-               blockKeeper: newBlockKeeper(chain, sw),
-               txPool:      txPool,
-               mining:      mining,
-               mux:         http.NewServeMux(),
-               sw:          sw,
-               hsm:         hsm,
+               chain:         chain,
+               pinStore:      pinStore,
+               accounts:      accounts,
+               assets:        assets,
+               blockKeeper:   newBlockKeeper(chain, sw),
+               txPool:        txPool,
+               mining:        mining,
+               mux:           http.NewServeMux(),
+               sw:            sw,
+               hsm:           hsm,
+               txFeedTracker: txfeeds,
        }
        bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
        return bcR
@@ -334,6 +335,7 @@ func (bcR *BlockchainReactor) syncRoutine() {
        for {
                select {
                case newTx := <-newTxCh:
+                       bcR.txFeedTracker.TxFilter(newTx)
                        go bcR.BroadcastTransaction(newTx)
                case _ = <-statusUpdateTicker.C:
                        go bcR.BroadcastStatusResponse()
index f98f59a..af63fc3 100644 (file)
@@ -6,14 +6,14 @@ import (
        "sync"
        "time"
 
+       log "github.com/sirupsen/logrus"
+
        "github.com/bytom/blockchain/txbuilder"
+       chainjson "github.com/bytom/encoding/json"
        "github.com/bytom/errors"
        "github.com/bytom/net/http/httperror"
        "github.com/bytom/net/http/reqid"
        "github.com/bytom/protocol/bc/legacy"
-
-       chainjson "github.com/bytom/encoding/json"
-       log "github.com/sirupsen/logrus"
 )
 
 var defaultTxTTL = 5 * time.Minute
@@ -42,11 +42,7 @@ func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilde
        }
        return decoder, true
 }
-/*             {"actions": [
-                       {"type": "spend", "asset_id": "%s", "amount": 100},
-                       {"type": "control_account", "asset_id": "%s", "amount": 100, "account_id": "%s"}
-               ]}`
-*/
+
 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
        err := a.filterAliases(ctx, req)
        if err != nil {
@@ -141,68 +137,6 @@ func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Tem
        return map[string]string{"id": tpl.Transaction.ID.String()}, nil
 }
 
-/*
-// recordSubmittedTx records a lower bound height at which the tx
-// was first submitted to the tx pool. If this request fails for
-// some reason, a retry will know to look for the transaction in
-// blocks starting at this height.
-//
-// If the tx has already been submitted, it returns the existing
-// height.
-func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
-       const insertQ = `
-               INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
-               ON CONFLICT DO NOTHING
-       `
-       res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
-       if err != nil {
-               return 0, err
-       }
-       inserted, err := res.RowsAffected()
-       if err != nil {
-               return 0, err
-       }
-       if inserted == 1 {
-               return currentHeight, nil
-       }
-
-       // The insert didn't affect any rows, meaning there was already an entry
-       // for this transaction hash.
-       const selectQ = `
-               SELECT height FROM submitted_txs WHERE tx_hash = $1
-       `
-       var height uint64
-       err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
-       return height, err
-}
-*/
-
-/*
-// cleanUpSubmittedTxs will periodically delete records of submitted txs
-// older than a day. This function blocks and only exits when its context
-// is cancelled.
-func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
-       ticker := time.NewTicker(15 * time.Minute)
-       for {
-               select {
-               case <-ticker.C:
-                       // TODO(jackson): We could avoid expensive bulk deletes by partitioning
-                       // the table and DROP-ing tables of expired rows. Partitioning doesn't
-                       // play well with ON CONFLICT clauses though, so we would need to rework
-                       // how we guarantee uniqueness.
-                       const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
-                       _, err := db.Exec(ctx, q)
-                       if err != nil {
-                               log.Error(ctx, err)
-                       }
-               case <-ctx.Done():
-                       ticker.Stop()
-                       return
-               }
-       }
-}
-*/
-
 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
 // the transaction.  A nil error return means the transaction is
 // confirmed on the blockchain.  ErrRejected means a conflicting tx is
@@ -215,11 +149,6 @@ func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbu
        generatorHeight := localHeight
 
        log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
-       // Remember this height in case we retry this submit call.
-       /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
-       if err != nil {
-               return errors.Wrap(err, "saving tx submitted height")
-       }*/
 
        err := txbuilder.FinalizeTx(ctx, a.chain, txTemplate.Transaction)
        if err != nil {
@@ -272,7 +201,6 @@ func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx,
 
                        // might still be in pool or might be rejected; we can't
                        // tell definitively until its max time elapses.
-
                        // Re-insert into the pool in case it was dropped.
                        err = txbuilder.FinalizeTx(ctx, a.chain, tx)
                        if err != nil {
index 0219387..c2c41d3 100644 (file)
 package txfeed
 
 import (
-       //      "bytes"
        "context"
-       //"database/sql"
+       "encoding/json"
+       "strconv"
+       "strings"
 
-       //      "github.com/bytom/blockchain/query"
-       //      "github.com/bytom/database/pg"
+       log "github.com/sirupsen/logrus"
+       dbm "github.com/tendermint/tmlibs/db"
+
+       "github.com/bytom/blockchain/query"
        "github.com/bytom/errors"
+       "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/legacy"
+       "github.com/bytom/protocol/vm/vmutil"
+)
+
+const (
+       //FilterNumMax max txfeed filter amount.
+       FilterNumMax = 1024
 )
 
-var ErrDuplicateAlias = errors.New("duplicate feed alias")
+var (
+       //ErrDuplicateAlias means error of duplicate feed alias.
+       ErrDuplicateAlias = errors.New("duplicate feed alias")
+       //ErrEmptyAlias means error of empty feed alias.
+       ErrEmptyAlias = errors.New("empty feed alias")
+       //ErrNumExceedlimit means txfeed filter number exceeds the limit.
+       ErrNumExceedlimit  = errors.New("txfeed exceed limit")
+       maxNewTxfeedChSize = 1000
+)
 
+//Tracker filter tracker object.
 type Tracker struct {
-       DB string
+       DB       dbm.DB
+       TxFeeds  []*TxFeed
+       chain    *protocol.Chain
+       txfeedCh chan *legacy.Tx
+}
+
+type rawOutput struct {
+       OutputID bc.Hash
+       bc.AssetAmount
+       ControlProgram []byte
+       txHash         bc.Hash
+       outputIndex    uint32
+       sourceID       bc.Hash
+       sourcePos      uint64
+       refData        bc.Hash
 }
 
+//TxFeed describe a filter
 type TxFeed struct {
-       ID     string  `json:"id,omitempty"`
-       Alias  *string `json:"alias"`
-       Filter string  `json:"filter,omitempty"`
-       After  string  `json:"after,omitempty"`
+       ID     string `json:"id,omitempty"`
+       Alias  string `json:"alias"`
+       Filter string `json:"filter,omitempty"`
+       Param  filter `json:"param,omitempty"`
 }
 
-func (t *Tracker) Create(ctx context.Context, alias, fil, after string, clientToken string) (*TxFeed, error) {
-       // Validate the filter.
-       /*      err := query.ValidateTransactionFilter(fil)
+type filter struct {
+       assetID          string `json:"assetid,omitempty"`
+       amountLowerLimit uint64 `json:"lowerlimit,omitempty"`
+       amountUpperLimit uint64 `json:"upperlimit,omitempty"`
+       transType        string `json:"transtype,omitempty"`
+}
+
+//NewTracker create new txfeed tracker.
+func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
+       s := &Tracker{
+               DB:       db,
+               TxFeeds:  make([]*TxFeed, 0, 10),
+               chain:    chain,
+               txfeedCh: make(chan *legacy.Tx, maxNewTxfeedChSize),
+       }
+
+       return s
+}
+
+func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
+       iter := db.Iterator()
+       defer iter.Release()
+
+       for iter.Next() {
+               txFeed := &TxFeed{}
+               if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
+                       return nil, err
+               }
+               filter, err := parseFilter(txFeed.Filter)
                if err != nil {
                        return nil, err
                }
+               txFeed.Param = filter
+               txFeeds = append(txFeeds, txFeed)
+       }
+       return txFeeds, nil
+}
 
-               var ptrAlias *string
-               if alias != "" {
-                       ptrAlias = &alias
-               }
+func parseFilter(ft string) (filter, error) {
+       var res filter
 
-               feed := &TxFeed{
-                       Alias:  ptrAlias,
-                       Filter: fil,
-                       After:  after,
+       subFilter := strings.Split(ft, "AND")
+       for _, value := range subFilter {
+               param := getParam(value, "=")
+               if param == "" {
+                       continue
+               }
+               if strings.Contains(value, "asset_id") {
+                       res.assetID = param
+               }
+               if strings.Contains(value, "amount_lower_limit") {
+                       tmp, _ := strconv.ParseInt(param, 10, 64)
+                       res.amountLowerLimit = uint64(tmp)
+               }
+               if strings.Contains(value, "amount_upper_limit") {
+                       tmp, _ := strconv.ParseInt(param, 10, 64)
+                       res.amountUpperLimit = uint64(tmp)
+               }
+               if strings.Contains(value, "trans_type") {
+                       res.transType = param
                }
-               return insertTxFeed(ctx, t.DB, feed, clientToken)
+       }
+       return res, nil
+}
 
-       */
-       return nil, nil
+//TODO
+func getParam(str, substr string) string {
+       if result := strings.Index(str, substr); result >= 0 {
+               str := strings.Replace(str[result+1:], "'", "", -1)
+               str = strings.Replace(str, " ", "", -1)
+               return str
+       }
+       return ""
 }
 
-// insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
-// and there already exists a txfeed with that client token, insertTxFeed will
-// lookup and return the existing txfeed instead.
+func parseTxfeed(db dbm.DB, filters []filter) error {
+       var txFeed TxFeed
+       var index int
 
-func insertTxFeed(ctx context.Context /* db pg.DB,*/, feed *TxFeed, clientToken string) (*TxFeed, error) {
-       /*      const q = `
-                       INSERT INTO txfeeds (alias, filter, after, client_token)
-                       VALUES ($1, $2, $3, $4)
-                       ON CONFLICT (client_token) DO NOTHING
-                       RETURNING id
-               `
+       iter := db.Iterator()
+       defer iter.Release()
 
-               var alias sql.NullString
-               if feed.Alias != nil {
-                       alias = sql.NullString{Valid: true, String: *feed.Alias}
-               }
+       for iter.Next() {
 
-               nullToken := sql.NullString{
-                       String: clientToken,
-                       Valid:  clientToken != "",
+               if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
+                       return err
                }
 
-               err := db.QueryRowContext(
-                       ctx, q, alias, feed.Filter, feed.After,
-                       nullToken).Scan(&feed.ID)
-
-               if pg.IsUniqueViolation(err) {
-                       return nil, errors.WithDetail(ErrDuplicateAlias, "a transaction feed with the provided alias already exists")
-               } else if err == sql.ErrNoRows && clientToken != "" {
-                       // There is already a txfeed with the provided client
-                       // token. We should return the existing txfeed
-                       feed, err = txfeedByClientToken(ctx, db, clientToken)
-                       if err != nil {
-                               return nil, errors.Wrap(err, "retrieving existing txfeed")
+               subFilter := strings.Split(txFeed.Filter, "AND")
+               for _, value := range subFilter {
+                       param := getParam(value, "=")
+                       if param == "" {
+                               continue
+                       }
+                       if strings.Contains(value, "asset_id") {
+                               filters[index].assetID = param
+                       }
+                       if strings.Contains(value, "amount_lower_limit") {
+                               tmp, _ := strconv.ParseInt(param, 10, 64)
+                               filters[index].amountLowerLimit = uint64(tmp)
+                       }
+                       if strings.Contains(value, "amount_upper_limit") {
+                               tmp, _ := strconv.ParseInt(param, 10, 64)
+                               filters[index].amountUpperLimit = uint64(tmp)
+                       }
+                       if strings.Contains(value, "trans_type") {
+                               filters[index].transType = param
                        }
-               } else if err != nil {
-                       return nil, err
                }
-       */
-       //      return feed, nil
-       return nil, nil
+               index++
+       }
+       return nil
 }
 
-func txfeedByClientToken(ctx context.Context /* db pg.DB,*/, clientToken string) (*TxFeed, error) {
-       /*      const q = `
-                       SELECT id, alias, filter, after
-                       FROM txfeeds
-                       WHERE client_token=$1
-               `
-
-               var (
-                       feed  TxFeed
-                       alias sql.NullString
-               )
-               err := db.QueryRowContext(ctx, q, clientToken).Scan(&feed.ID, &alias, &feed.Filter, &feed.After)
-               if err != nil {
-                       return nil, err
-               }
+//Prepare load and parse filters.
+func (t *Tracker) Prepare(ctx context.Context) error {
+       var err error
+       t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
+       return err
+}
 
-               if alias.Valid {
-                       feed.Alias = &alias.String
-               }
-       */
-       //      return &feed, nil
-       return nil, nil
+//GetTxfeedCh return a txfeed channel.
+func (t *Tracker) GetTxfeedCh() chan *legacy.Tx {
+       return t.txfeedCh
 }
 
-func (t *Tracker) Find(ctx context.Context, id, alias string) (*TxFeed, error) {
-       /*      var q bytes.Buffer
+//Create create a txfeed filter.
+func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
+       // Validate the filter.
 
-               q.WriteString(`
-                       SELECT id, alias, filter, after
-                       FROM txfeeds
-                       WHERE
-               `)
+       if err := query.ValidateTransactionFilter(fil); err != nil {
+               return err
+       }
 
-               if id != "" {
-                       q.WriteString(`id=$1`)
-               } else {
-                       q.WriteString(`alias=$1`)
-                       id = alias
-               }
+       if alias == "" {
+               return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
+       }
 
-               var (
-                       feed     TxFeed
-                       sqlAlias sql.NullString
-               )
+       if len(t.TxFeeds) >= FilterNumMax {
+               return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
+       }
 
-               err := t.DB.QueryRowContext(ctx, q.String(), id).Scan(&feed.ID, &sqlAlias, &feed.Filter, &feed.After)
-               if err == sql.ErrNoRows {
-                       err = errors.Sub(pg.ErrUserInputNotFound, err)
-                       err = errors.WithDetailf(err, "alias: %s", alias)
-                       return nil, err
-               }
-               if err != nil {
-                       return nil, err
+       for _, txfeed := range t.TxFeeds {
+               if txfeed.Alias == alias {
+                       return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
                }
+       }
+
+       feed := &TxFeed{
+               Alias:  alias,
+               Filter: fil,
+       }
+
+       filter, err := parseFilter(feed.Filter)
+       if err != nil {
+               return err
+       }
+       feed.Param = filter
+       t.TxFeeds = append(t.TxFeeds, feed)
+       return insertTxFeed(t.DB, feed)
+}
 
-               if sqlAlias.Valid {
-                       feed.Alias = &sqlAlias.String
-               }
-       */
-       //      return &feed, nil
-       return nil, nil
+func deleteTxFeed(db dbm.DB, alias string) error {
+       key, err := json.Marshal(alias)
+       if err != nil {
+               return err
+       }
+       db.Delete(key)
+       return nil
 }
 
-func (t *Tracker) Delete(ctx context.Context, id, alias string) error {
-       /*      var q bytes.Buffer
+// insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
+// and there already exists a txfeed with that client token, insertTxFeed will
+// lookup and return the existing txfeed instead.
+func insertTxFeed(db dbm.DB, feed *TxFeed) error {
+       // var err error
+       key, err := json.Marshal(feed.Alias)
+       if err != nil {
+               return err
+       }
+       value, err := json.Marshal(feed)
+       if err != nil {
+               return err
+       }
+
+       db.Set(key, value)
+       return nil
+}
 
-               q.WriteString(`DELETE FROM txfeeds WHERE `)
+//Get get txfeed filter with alias.
+func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
+       if alias == "" {
+               return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
+       }
 
-               if id != "" {
-                       q.WriteString(`id=$1`)
-               } else {
-                       q.WriteString(`alias=$1`)
-                       id = alias
+       for i, v := range t.TxFeeds {
+               if v.Alias == alias {
+                       return t.TxFeeds[i], nil
                }
+       }
+       return nil, nil
+}
 
-               res, err := t.DB.ExecContext(ctx, q.String(), id)
-               if err != nil {
-                       return err
-               }
+//Delete delete txfeed with alias.
+func (t *Tracker) Delete(ctx context.Context, alias string) error {
+       log.WithField("delete", alias).Info("delete txfeed")
 
-               affected, err := res.RowsAffected()
-               if err != nil {
-                       return err
-               }
+       if alias == "" {
+               return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
+       }
 
-               if affected == 0 {
-                       return errors.WithDetailf(pg.ErrUserInputNotFound, "could not find and delete txfeed with id/alias=%s", id)
+       for i, txfeed := range t.TxFeeds {
+               if txfeed.Alias == alias {
+                       t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
+                       return deleteTxFeed(t.DB, alias)
                }
-       */
+       }
        return nil
 }
 
-func (t *Tracker) Update(ctx context.Context, id, alias, after, prev string) (*TxFeed, error) {
-       /*      var q bytes.Buffer
-
-               q.WriteString(`UPDATE txfeeds SET after=$1 WHERE `)
+func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
+       assetidstr := value.AssetID.String()
+
+       if 0 != strings.Compare(txfeed.Param.assetID, assetidstr) && txfeed.Param.assetID != "" {
+               return false
+       }
+       if 0 != strings.Compare(txfeed.Param.transType, value.Type) && txfeed.Param.transType != "" {
+               return false
+       }
+       if txfeed.Param.amountLowerLimit > value.Amount && txfeed.Param.amountLowerLimit != 0 {
+               return false
+       }
+       if txfeed.Param.amountUpperLimit < value.Amount && txfeed.Param.amountUpperLimit != 0 {
+               return false
+       }
+
+       return true
+}
 
-               if id != "" {
-                       q.WriteString(`id=$2`)
-               } else {
-                       q.WriteString(`alias=$2`)
-                       id = alias
+//TxFilter filter tx from mempool.
+func (t *Tracker) TxFilter(tx *legacy.Tx) error {
+       var annotatedTx *query.AnnotatedTx
+       // Build the fully annotated transaction.
+       annotatedTx = buildAnnotatedTransaction(tx)
+       for _, output := range annotatedTx.Outputs {
+               for _, filter := range t.TxFeeds {
+                       if match := outputFilter(filter, output); !match {
+                               continue
+                       }
+                       b, err := json.Marshal(annotatedTx)
+                       if err != nil {
+                               return err
+                       }
+                       log.WithField("filter", string(b)).Info("find new tx match filter")
+                       t.txfeedCh <- tx
                }
+       }
+       return nil
+}
 
-               q.WriteString(` AND after=$3`)
+var emptyJSONObject = json.RawMessage(`{}`)
+
+func buildAnnotatedTransaction(orig *legacy.Tx) *query.AnnotatedTx {
+       tx := &query.AnnotatedTx{
+               ID:      orig.ID,
+               Inputs:  make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
+               Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
+       }
+
+       for i := range orig.Inputs {
+               tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
+       }
+       for i := range orig.Outputs {
+               tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
+       }
+       return tx
+}
 
-               res, err := t.DB.ExecContext(ctx, q.String(), after, id, prev)
-               if err != nil {
-                       return nil, err
-               }
+func buildAnnotatedInput(tx *legacy.Tx, i uint32) *query.AnnotatedInput {
+       orig := tx.Inputs[i]
+       in := &query.AnnotatedInput{
+               AssetID:         orig.AssetID(),
+               Amount:          orig.Amount(),
+               AssetDefinition: &emptyJSONObject,
+               AssetTags:       &emptyJSONObject,
+               ReferenceData:   &emptyJSONObject,
+       }
+
+       id := tx.Tx.InputIDs[i]
+       e := tx.Entries[id]
+       switch e := e.(type) {
+       case *bc.Spend:
+               in.Type = "spend"
+               in.ControlProgram = orig.ControlProgram()
+               in.SpentOutputID = e.SpentOutputId
+       case *bc.Issuance:
+               in.Type = "issue"
+               in.IssuanceProgram = orig.IssuanceProgram()
+       }
+
+       return in
+}
 
-               affected, err := res.RowsAffected()
-               if err != nil {
-                       return nil, err
-               }
+func buildAnnotatedOutput(tx *legacy.Tx, idx int) *query.AnnotatedOutput {
+       orig := tx.Outputs[idx]
+       outid := tx.OutputID(idx)
+       out := &query.AnnotatedOutput{
+               OutputID:        *outid,
+               Position:        idx,
+               AssetID:         *orig.AssetId,
+               AssetDefinition: &emptyJSONObject,
+               AssetTags:       &emptyJSONObject,
+               Amount:          orig.Amount,
+               ControlProgram:  orig.ControlProgram,
+               ReferenceData:   &emptyJSONObject,
+       }
+
+       if vmutil.IsUnspendable(out.ControlProgram) {
+               out.Type = "retire"
+       } else {
+               out.Type = "control"
+       }
+       return out
+}
 
-               if affected == 0 {
-                       return nil, errors.WithDetailf(pg.ErrUserInputNotFound, "could not find txfeed with id/alias=%s and prev=%s", id, prev)
+// localAnnotator depends on the asset and account annotators and
+// must be run after them.
+func localAnnotator(tx *query.AnnotatedTx) {
+       for _, in := range tx.Inputs {
+               if in.AccountID != "" {
+                       tx.IsLocal, in.IsLocal = true, true
+               }
+               if in.Type == "issue" && in.AssetIsLocal {
+                       tx.IsLocal, in.IsLocal = true, true
                }
+       }
 
-               return &TxFeed{
-                       ID:    id,
-                       Alias: &alias,
-                       After: after,
-               }, nil
-       */
-       /*      return &TxFeed{
-                       ID:     nil,
-                       Alias   nil,
-                       After   nil,
+       for _, out := range tx.Outputs {
+               if out.AccountID != "" {
+                       tx.IsLocal, out.IsLocal = true, true
                }
-       */return nil, nil
+       }
 }
index a2add5a..9b54522 100644 (file)
@@ -2,67 +2,73 @@ package blockchain
 
 import (
        "context"
+       "encoding/json"
+
+       log "github.com/sirupsen/logrus"
 
        "github.com/bytom/blockchain/query"
        "github.com/bytom/blockchain/txfeed"
-       "github.com/bytom/errors"
-       "github.com/bytom/net/http/httpjson"
 )
 
 // POST /create-txfeed
-func (a *BlockchainReactor) createTxFeed(ctx context.Context, in struct {
+func (bcr *BlockchainReactor) createTxFeed(ctx context.Context, in struct {
        Alias  string
        Filter string
+}) error {
+       if err := bcr.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+               log.WithField("error", err).Error("Add TxFeed Failed")
+               return err
+       }
+       return nil
+}
+
+func (bcr *BlockchainReactor) getTxFeedByAlias(ctx context.Context, filter string) ([]*txfeed.TxFeed, error) {
+       txFeed := &txfeed.TxFeed{}
+       txFeeds := []*txfeed.TxFeed{}
+
+       jf, err := json.Marshal(filter)
+       if err != nil {
+               return nil, err
+       }
+
+       value := bcr.txFeedTracker.DB.Get(jf)
+       if value == nil {
+               return nil, nil
+       }
 
-       // ClientToken is the application's unique token for the txfeed. Every txfeed
-       // should have a unique client token. The client token is used to ensure
-       // idempotency of create txfeed requests. Duplicate create txfeed requests
-       // with the same client_token will only create one txfeed.
-       ClientToken string `json:"client_token"`
-}) (*txfeed.TxFeed, error) {
-       //      after := fmt.Sprintf("%d:%d-%d", a.chain.Height(), math.MaxInt32, uint64(math.MaxInt64))
-       //      return a.txFeeds.Create(ctx, in.Alias, in.Filter, after, in.ClientToken)
-       return nil, nil
+       if err := json.Unmarshal(value, txFeed); err != nil {
+               return nil, err
+       }
+       txFeeds = append(txFeeds, txFeed)
+       return txFeeds, nil
 }
 
 // POST /get-transaction-feed
-func (a *BlockchainReactor) getTxFeed(ctx context.Context, in struct {
-       ID    string `json:"id,omitempty"`
-       Alias string `json:"alias,omitempty"`
-}) (*txfeed.TxFeed, error) {
-       //      return a.txFeeds.Find(ctx, in.ID, in.Alias)
-       return nil, nil
+func (bcr *BlockchainReactor) getTxFeed(ctx context.Context, in requestQuery) interface{} {
+       txfeeds, err := bcr.getTxFeedByAlias(ctx, in.Filter)
+       if err != nil {
+               return err
+       }
+       return txfeeds
+
 }
 
 // POST /delete-transaction-feed
-func (a *BlockchainReactor) deleteTxFeed(ctx context.Context, in struct {
-       ID    string `json:"id,omitempty"`
+func (bcr *BlockchainReactor) deleteTxFeed(ctx context.Context, in struct {
        Alias string `json:"alias,omitempty"`
 }) error {
-       //      return a.txFeeds.Delete(ctx, in.ID, in.Alias)
-       return nil
+       return bcr.txFeedTracker.Delete(ctx, in.Alias)
 }
 
 // POST /update-transaction-feed
-func (a *BlockchainReactor) updateTxFeed(ctx context.Context, in struct {
-       ID    string `json:"id,omitempty"`
-       Alias string `json:"alias,omitempty"`
-       Prev  string `json:"previous_after"`
-       After string `json:"after"`
-}) (*txfeed.TxFeed, error) {
-       // TODO(tessr): Consider moving this function into the txfeed package.
-       // (It's currently outside the txfeed package to avoid a dependecy cycle
-       // between txfeed and query.)
-       bad, err := txAfterIsBefore(in.After, in.Prev)
-       if err != nil {
-               return nil, err
-       }
-
-       if bad {
-               return nil, errors.WithDetail(httpjson.ErrBadRequest, "new After cannot be before Prev")
+func (bcr *BlockchainReactor) updateTxFeed(ctx context.Context, in struct {
+       Alias  string
+       Filter string
+}) error {
+       if err := bcr.txFeedTracker.Delete(ctx, in.Alias); err != nil {
+               return err
        }
-       //      return a.txFeeds.Update(ctx, in.ID, in.Alias, in.After, in.Prev)
-       return nil, nil
+       return bcr.txFeedTracker.Create(ctx, in.Alias, in.Filter)
 }
 
 // txAfterIsBefore returns true if a is before b. It returns an error if either
@@ -82,3 +88,28 @@ func txAfterIsBefore(a, b string) (bool, error) {
                (aAfter.FromBlockHeight == bAfter.FromBlockHeight &&
                        aAfter.FromPosition < bAfter.FromPosition), nil
 }
+
+func (bcr *BlockchainReactor) getTxFeeds() ([]*txfeed.TxFeed, error) {
+       txFeeds := make([]*txfeed.TxFeed, 0)
+       iter := bcr.txFeedTracker.DB.Iterator()
+       defer iter.Release()
+
+       for iter.Next() {
+               txFeed := &txfeed.TxFeed{}
+               if err := json.Unmarshal(iter.Value(), txFeed); err != nil {
+                       return nil, err
+               }
+               txFeeds = append(txFeeds, txFeed)
+       }
+       return txFeeds, nil
+}
+
+// listTxFeeds is an http handler for listing txfeeds. It does not take a filter.
+// POST /list-transaction-feeds
+func (bcr *BlockchainReactor) listTxFeeds(ctx context.Context, in requestQuery) interface{} {
+       txfeeds, err := bcr.getTxFeeds()
+       if err != nil {
+               return err
+       }
+       return txfeeds
+}
index 2b8f9d2..8f3aaf5 100644 (file)
@@ -668,61 +668,73 @@ func createAccountReceiver(client *rpc.Client, args []string) {
 }
 
 func createTxFeed(client *rpc.Client, args []string) {
-       if len(args) != 1 {
-               fatalln("error:createTxFeed take no arguments")
+       if len(args) != 2 {
+               fatalln("error:createTxFeed need arguments")
        }
        type In struct {
-               Alias       string
-               Filter      string
-               ClientToken string `json:"client_token"`
+               Alias  string
+               Filter string
        }
        var in In
-       in.Alias = "asdfgh"
-       in.Filter = "zxcvbn"
-       in.ClientToken = args[0]
-       client.Call(context.Background(), "/create-transaction-feed", &[]In{in}, nil)
+       in.Alias = args[0]
+       in.Filter = args[1]
+
+       client.Call(context.Background(), "/create-transaction-feed", in, nil)
 }
 
 func getTxFeed(client *rpc.Client, args []string) {
-       if len(args) != 0 {
-               fatalln("error:getTxFeed not use args")
+       if len(args) != 1 {
+               fatalln("error:getTxFeed use args alias")
        }
-       type In struct {
-               ID    string `json:"id,omitempty"`
-               Alias string `json:"alias,omitempty"`
+       type requestQuery struct {
+               Filter       string        `json:"filter,omitempty"`
+               FilterParams []interface{} `json:"filter_params,omitempty"`
+               SumBy        []string      `json:"sum_by,omitempty"`
+               PageSize     int           `json:"page_size"`
+               AscLongPoll  bool          `json:"ascending_with_long_poll,omitempty"`
+               Timeout      json.Duration `json:"timeout"`
+               After        string        `json:"after"`
+               StartTimeMS  uint64        `json:"start_time,omitempty"`
+               EndTimeMS    uint64        `json:"end_time,omitempty"`
+               TimestampMS  uint64        `json:"timestamp,omitempty"`
+               Type         string        `json:"type"`
+               Aliases      []string      `json:"aliases,omitempty"`
+       }
+       var in requestQuery
+       in.Filter = args[0]
+       responses := make([]interface{}, 0)
+       client.Call(context.Background(), "/get-transaction-feed", in, &responses)
+       if len(responses) > 0 {
+               for i, item := range responses {
+                       fmt.Println(i, "-----", item)
+               }
        }
-       var in In
-       in.Alias = "qwerty"
-       in.ID = "123456"
-       client.Call(context.Background(), "/get-transaction-feed", &[]In{in}, nil)
 }
 
 func updateTxFeed(client *rpc.Client, args []string) {
-       if len(args) != 0 {
-               fatalln("error:updateTxFeed not use args")
+       if len(args) != 2 {
+               fatalln("error:createTxFeed need arguments")
        }
        type In struct {
-               ID    string `json:"id,omitempty"`
-               Alias string `json:"alias,omitempty"`
+               Alias  string
+               Filter string
        }
        var in In
-       in.ID = "123456"
-       in.Alias = "qwerty"
-       client.Call(context.Background(), "/update-transaction-feed", &[]In{in}, nil)
+       in.Alias = args[0]
+       in.Filter = args[1]
+       client.Call(context.Background(), "/update-transaction-feed", in, nil)
 }
 
 func deleteTxFeed(client *rpc.Client, args []string) {
-       if len(args) != 0 {
-               fatalln("error:deleteTxFeed not use args")
+       if len(args) != 1 {
+               fatalln("error:deleteTxFeed use args alias")
        }
        type In struct {
-               ID    string `json:"id,omitempty"`
                Alias string `json:"alias,omitempty"`
        }
        var in In
-       in.ID = "123456"
-       in.Alias = "qwerty"
-       client.Call(context.Background(), "/delete-transaction-feed", &[]In{in}, nil)
+       in.Alias = args[0]
+       client.Call(context.Background(), "/delete-transaction-feed", in, nil)
 }
 
 func listAccounts(client *rpc.Client, args []string) {
@@ -748,11 +760,11 @@ func listAccounts(client *rpc.Client, args []string) {
        responses := make([]interface{}, 0)
 
        client.Call(context.Background(), "/list-accounts", in, &responses)
-       if len(responses) > 0 {
-               for i, item := range responses {
-                       fmt.Println(i, "-----", item)
-               }
-       }
+       // if len(responses) > 0 {
+       //      for i, item := range responses {
+       //              fmt.Println(i, "-----", item)
+       //      }
+       // }
 }
 
 func listAssets(client *rpc.Client, args []string) {
@@ -785,6 +797,7 @@ func listAssets(client *rpc.Client, args []string) {
 }
 
 func listTxFeeds(client *rpc.Client, args []string) {
+       fmt.Println("listTxFeeds")
        if len(args) != 0 {
                fatalln("error:listTxFeeds not use args")
        }
@@ -803,10 +816,16 @@ func listTxFeeds(client *rpc.Client, args []string) {
                Aliases      []string      `json:"aliases,omitempty"`
        }
        var in requestQuery
-       after := in.After
-       out := in
-       out.After = after
-       client.Call(context.Background(), "/list-transactions-feeds", &[]requestQuery{in}, nil)
+
+       responses := make([]interface{}, 0)
+
+       client.Call(context.Background(), "/list-transaction-feeds", in, &responses)
+       if len(responses) > 0 {
+               for i, item := range responses {
+                       fmt.Println(i, "-----", item)
+               }
+       }
+
 }
 
 func listTransactions(client *rpc.Client, args []string) {
index 94b7baf..bd417b3 100644 (file)
@@ -24,6 +24,7 @@ import (
        "github.com/bytom/blockchain/pin"
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txdb"
+       "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
        "github.com/bytom/consensus"
        "github.com/bytom/env"
@@ -193,6 +194,7 @@ func NewNode(config *cfg.Config) *Node {
        var accounts *account.Manager = nil
        var assets *asset.Registry = nil
        var pinStore *pin.Store = nil
+       var txFeed *txfeed.Tracker = nil
 
        if config.Wallet.Enable {
                accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
@@ -220,6 +222,15 @@ func NewNode(config *cfg.Config) *Node {
 
                assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
                assets = asset.NewRegistry(assetsDB, chain)
+
+               txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
+               txFeed = txfeed.NewTracker(txFeedDB, chain)
+
+               if err = txFeed.Prepare(ctx); err != nil {
+                       log.WithField("error", err).Error("start txfeed")
+                       return nil
+               }
+
        }
        //Todo HSM
        /*
@@ -237,7 +248,7 @@ func NewNode(config *cfg.Config) *Node {
        if err != nil {
                cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
        }
-       bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore)
+       bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore, txFeed)
 
        sw.AddReactor("BLOCKCHAIN", bcReactor)