}, nil
}
-// listTxFeeds is an http handler for listing txfeeds. It does not take a filter.
-//
-// POST /list-transaction-feeds
-func (bcr *BlockchainReactor) listTxFeeds(ctx context.Context, in requestQuery) (page, error) {
- limit := in.PageSize
- if limit == 0 {
- limit = defGenericPageSize
- }
-
- after := in.After
-
- /* txfeeds, after, err := bcr.txFeeds.Query(ctx, after, limit)
- if err != nil {
- return page{}, errors.Wrap(err, "running txfeed query")
- }
- */
- out := in
- out.After = after
- return page{
- // Items: httpjson.Array(txfeeds),
- // LastPage: len(txfeeds) < limit,
- Next: out,
- }, nil
-}
-
// POST /list-unspent-outputs
func (bcr *BlockchainReactor) listUnspentOutputs(ctx context.Context, in requestQuery) interface{} {
// Parse parses a predicate and returns an internal representation of the
// predicate or an error if it fails to parse.
-/*func Parse(predicate string, tbl *SQLTable, vals []interface{}) (p Predicate, err error) {
+func Parse(predicate string, tbl *Table, vals []interface{}) (p Predicate, err error) {
expr, parser, err := parse(predicate)
if err != nil {
return p, errors.WithDetail(ErrBadFilter, err.Error())
Parameters: parser.maxPlaceholder,
}, nil
}
-*/
// Field is a type for simple expressions that simply access an attribute of
// the queried object. They're used for GROUP BYs.
package filter
import (
- //"errors"
"fmt"
- //"strings"
+
+ "github.com/bytom/errors"
)
+//Column describe a column
+type Column struct {
+ Name string
+ Type Type
+}
+
+//Table describe a table
+type Table struct {
+ Name string
+ Alias string
+ Columns map[string]*Column
+ ForeignKeys map[string]*ForeignKey
+}
+
+//ForeignKey describe a foreign key
+type ForeignKey struct {
+ Table *Table
+ LocalColumn string
+ ForeignColumn string
+}
+
func isType(got Type, want Type) bool {
return got == want || got == Any
}
// typeCheck will statically type check expr with vals as the parameters
// and using tbl to determine available attributes and environments. It
// returns the inferred types of arbitrary json keys as a map.
-/*func typeCheck(expr expr, tbl *SQLTable, vals []interface{}) (map[string]Type, error) {
+func typeCheck(expr expr, tbl *Table, vals []interface{}) (map[string]Type, error) {
valTypes, err := valueTypes(vals)
if err != nil {
return nil, err
return selectorTypes, nil
}
-func typeCheckExpr(expr expr, tbl *SQLTable, valTypes []Type, selectorTypes map[string]Type) (typ Type, err error) {
+func typeCheckExpr(expr expr, tbl *Table, valTypes []Type, selectorTypes map[string]Type) (typ Type, err error) {
if expr == nil { // no expr is a valid, bool type
return Bool, nil
}
panic(fmt.Errorf("unrecognized expr type %T", expr))
}
}
-*/
func assertType(expr expr, got, want Type, selectorTypes map[string]Type) (bool, error) {
if !isType(got, want) { // type does not match
package query
import (
- // "bytes"
- // "context"
- // "encoding/json"
"fmt"
"math"
- // "strconv"
- // "github.com/blockchain/blockchain/query/filter"
+ "github.com/bytom/blockchain/query/filter"
"github.com/bytom/errors"
)
+var filterTable = filter.Table{
+ Name: "annotated_txs",
+ Alias: "txs",
+ Columns: map[string]*filter.Column{
+ "asset_id": {Name: "assetid", Type: filter.String},
+ "amount_lower_limit": {Name: "amountlower", Type: filter.Integer},
+ "amount_upper_limit": {Name: "amountupper", Type: filter.Integer},
+ "trans_type": {Name: "transtype", Type: filter.String},
+ },
+}
+
var (
- ErrBadAfter = errors.New("malformed pagination parameter after")
+ //ErrBadAfter means malformed pagination parameter.
+ ErrBadAfter = errors.New("malformed pagination parameter after")
+ //ErrParameterCountMismatch means wrong number of parameters to query.
ErrParameterCountMismatch = errors.New("wrong number of parameters to query")
)
+//TxAfter means the last query block by a list-transactions query.
type TxAfter struct {
// FromBlockHeight and FromPosition uniquely identify the last transaction returned
// by a list-transactions query.
return fmt.Sprintf("%d:%d-%d", after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
}
+//DecodeTxAfter decode tx from the last block.
func DecodeTxAfter(str string) (c TxAfter, err error) {
var from, pos, stop uint64
_, err = fmt.Sscanf(str, "%d:%d-%d", &from, &pos, &stop)
return TxAfter{FromBlockHeight: from, FromPosition: uint32(pos), StopBlockHeight: stop}, nil
}
-/*
+//ValidateTransactionFilter verify txfeed filter validity.
func ValidateTransactionFilter(filt string) error {
- _, err := filter.Parse(filt, transactionsTable, nil)
+ _, err := filter.Parse(filt, &filterTable, nil)
return err
}
-
-// LookupTxAfter looks up the transaction `after` for the provided time range.
-func (ind *Indexer) LookupTxAfter(ctx context.Context, begin, end uint64) (TxAfter, error) {
- const q = `
- SELECT COALESCE(MAX(height), 0), COALESCE(MIN(height), 0) FROM query_blocks
- WHERE timestamp >= $1 AND timestamp <= $2
- `
-
- var from, stop uint64
- err := ind.db.QueryRowContext(ctx, q, begin, end).Scan(&from, &stop)
- if err != nil {
- return TxAfter{}, errors.Wrap(err, "querying `query_blocks`")
- }
- return TxAfter{
- FromBlockHeight: from,
- FromPosition: math.MaxInt32, // TODO(tessr): Support reversing direction.
- StopBlockHeight: stop,
- }, nil
-}
-
-// Transactions queries the blockchain for transactions matching the
-// filter predicate `filt`.
-func (ind *Indexer) Transactions(ctx context.Context, filt string, vals []interface{}, after TxAfter, limit int, asc bool) ([]*AnnotatedTx, *TxAfter, error) {
- p, err := filter.Parse(filt, transactionsTable, vals)
- if err != nil {
- return nil, nil, err
- }
- if len(vals) != p.Parameters {
- return nil, nil, ErrParameterCountMismatch
- }
- expr, err := filter.AsSQL(p, transactionsTable, vals)
- if err != nil {
- return nil, nil, errors.Wrap(err, "converting to SQL")
- }
-
- queryStr, queryArgs := constructTransactionsQuery(expr, vals, after, asc, limit)
-
- if asc {
- return ind.waitForAndFetchTransactions(ctx, queryStr, queryArgs, after, limit)
- }
- return ind.fetchTransactions(ctx, queryStr, queryArgs, after, limit)
-}
-
-// If asc is true, the transactions will be returned from "in front" of the `after`
-// param (e.g., the oldest transaction immediately after the `after` param,
-// followed by the second oldest, etc) in ascending order.
-func constructTransactionsQuery(expr string, vals []interface{}, after TxAfter, asc bool, limit int) (string, []interface{}) {
- var buf bytes.Buffer
-
- buf.WriteString("SELECT block_height, tx_pos, data FROM annotated_txs AS txs")
- buf.WriteString(" WHERE ")
-
- // add filter conditions
- if len(expr) > 0 {
- buf.WriteString(expr)
- buf.WriteString(" AND ")
- }
-
- if asc {
- // add time range & after conditions
- buf.WriteString(fmt.Sprintf("(txs.block_height, txs.tx_pos) > ($%d, $%d) AND ", len(vals)+1, len(vals)+2))
- buf.WriteString(fmt.Sprintf("txs.block_height <= $%d ", len(vals)+3))
- vals = append(vals, after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
-
- buf.WriteString("ORDER BY txs.block_height ASC, txs.tx_pos ASC ")
- } else {
- // add time range & after conditions
- buf.WriteString(fmt.Sprintf("(txs.block_height, txs.tx_pos) < ($%d, $%d) AND ", len(vals)+1, len(vals)+2))
- buf.WriteString(fmt.Sprintf("txs.block_height >= $%d ", len(vals)+3))
- vals = append(vals, after.FromBlockHeight, after.FromPosition, after.StopBlockHeight)
-
- buf.WriteString("ORDER BY txs.block_height DESC, txs.tx_pos DESC ")
- }
-
- buf.WriteString("LIMIT " + strconv.Itoa(limit))
- return buf.String(), vals
-}
-
-func (ind *Indexer) fetchTransactions(ctx context.Context, queryStr string, queryArgs []interface{}, after TxAfter, limit int) ([]*AnnotatedTx, *TxAfter, error) {
- rows, err := ind.db.QueryContext(ctx, queryStr, queryArgs...)
- if err != nil {
- return nil, nil, errors.Wrap(err, "executing txn query")
- }
- defer rows.Close()
-
- txns := make([]*AnnotatedTx, 0, limit)
- for rows.Next() {
- var data []byte
- err := rows.Scan(&after.FromBlockHeight, &after.FromPosition, &data)
- if err != nil {
- return nil, nil, errors.Wrap(err, "scanning transaction row")
- }
- tx := new(AnnotatedTx)
- err = json.Unmarshal(data, tx)
- if err != nil {
- return nil, nil, errors.Wrap(err, "unmarshaling annotated transaction")
- }
- txns = append(txns, tx)
- }
- err = rows.Err()
- if err != nil {
- return nil, nil, errors.Wrap(err)
- }
- return txns, &after, nil
-}
-
-type fetchResp struct {
- txns []*AnnotatedTx
- after *TxAfter
- err error
-}
-
-func (ind *Indexer) waitForAndFetchTransactions(ctx context.Context, queryStr string, queryArgs []interface{}, after TxAfter, limit int) ([]*AnnotatedTx, *TxAfter, error) {
- resp := make(chan fetchResp, 1)
- go func() {
- var (
- txs []*AnnotatedTx
- aft *TxAfter
- err error
- )
-
- for h := ind.c.Height(); len(txs) == 0; h++ {
- <-ind.pinStore.PinWaiter(TxPinName, h)
- if err != nil {
- resp <- fetchResp{nil, nil, err}
- return
- }
-
- txs, aft, err = ind.fetchTransactions(ctx, queryStr, queryArgs, after, limit)
- if err != nil {
- resp <- fetchResp{nil, nil, err}
- return
- }
-
- if len(txs) > 0 {
- resp <- fetchResp{txs, aft, nil}
- return
- }
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, nil, ctx.Err()
- case r := <-resp:
- return r.txns, r.after, r.err
- }
-}
-*/
crosscoreRPCPrefix = "/rpc/"
)
-// BlockchainReactor handles long-term catchup syncing.
+//BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
- chain *protocol.Chain
- pinStore *pin.Store
- accounts *account.Manager
- assets *asset.Registry
- accesstoken *accesstoken.Token
- txFeeds *txfeed.TxFeed
- blockKeeper *blockKeeper
- txPool *protocol.TxPool
- hsm *pseudohsm.HSM
- mining *cpuminer.CPUMiner
- mux *http.ServeMux
- sw *p2p.Switch
- handler http.Handler
- evsw types.EventSwitch
+ chain *protocol.Chain
+ pinStore *pin.Store
+ accounts *account.Manager
+ assets *asset.Registry
+ accesstoken *accesstoken.Token
+ txFeedTracker *txfeed.Tracker
+ blockKeeper *blockKeeper
+ txPool *protocol.TxPool
+ hsm *pseudohsm.HSM
+ mining *cpuminer.CPUMiner
+ mux *http.ServeMux
+ sw *p2p.Switch
+ handler http.Handler
+ evsw types.EventSwitch
}
func batchRecover(ctx context.Context, v *interface{}) {
LastPage bool `json:"last_page"`
}
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store) *BlockchainReactor {
+func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store, txfeeds *txfeed.Tracker) *BlockchainReactor {
mining := cpuminer.NewCPUMiner(chain, accounts, txPool)
bcR := &BlockchainReactor{
- chain: chain,
- pinStore: pinStore,
- accounts: accounts,
- assets: assets,
- blockKeeper: newBlockKeeper(chain, sw),
- txPool: txPool,
- mining: mining,
- mux: http.NewServeMux(),
- sw: sw,
- hsm: hsm,
+ chain: chain,
+ pinStore: pinStore,
+ accounts: accounts,
+ assets: assets,
+ blockKeeper: newBlockKeeper(chain, sw),
+ txPool: txPool,
+ mining: mining,
+ mux: http.NewServeMux(),
+ sw: sw,
+ hsm: hsm,
+ txFeedTracker: txfeeds,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
for {
select {
case newTx := <-newTxCh:
+ bcR.txFeedTracker.TxFilter(newTx)
go bcR.BroadcastTransaction(newTx)
case _ = <-statusUpdateTicker.C:
go bcR.BroadcastStatusResponse()
"sync"
"time"
+ log "github.com/sirupsen/logrus"
+
"github.com/bytom/blockchain/txbuilder"
+ chainjson "github.com/bytom/encoding/json"
"github.com/bytom/errors"
"github.com/bytom/net/http/httperror"
"github.com/bytom/net/http/reqid"
"github.com/bytom/protocol/bc/legacy"
-
- chainjson "github.com/bytom/encoding/json"
- log "github.com/sirupsen/logrus"
)
var defaultTxTTL = 5 * time.Minute
}
return decoder, true
}
-/* {"actions": [
- {"type": "spend", "asset_id": "%s", "amount": 100},
- {"type": "control_account", "asset_id": "%s", "amount": 100, "account_id": "%s"}
- ]}`
-*/
+
func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
err := a.filterAliases(ctx, req)
if err != nil {
return map[string]string{"id": tpl.Transaction.ID.String()}, nil
}
-/*
-// recordSubmittedTx records a lower bound height at which the tx
-// was first submitted to the tx pool. If this request fails for
-// some reason, a retry will know to look for the transaction in
-// blocks starting at this height.
-//
-// If the tx has already been submitted, it returns the existing
-// height.
-func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
- const insertQ = `
- INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
- ON CONFLICT DO NOTHING
- `
- res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
- if err != nil {
- return 0, err
- }
- inserted, err := res.RowsAffected()
- if err != nil {
- return 0, err
- }
- if inserted == 1 {
- return currentHeight, nil
- }
-
- // The insert didn't affect any rows, meaning there was already an entry
- // for this transaction hash.
- const selectQ = `
- SELECT height FROM submitted_txs WHERE tx_hash = $1
- `
- var height uint64
- err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
- return height, err
-}
-*/
-
-/*
-// cleanUpSubmittedTxs will periodically delete records of submitted txs
-// older than a day. This function blocks and only exits when its context
-// is cancelled.
-func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
- ticker := time.NewTicker(15 * time.Minute)
- for {
- select {
- case <-ticker.C:
- // TODO(jackson): We could avoid expensive bulk deletes by partitioning
- // the table and DROP-ing tables of expired rows. Partitioning doesn't
- // play well with ON CONFLICT clauses though, so we would need to rework
- // how we guarantee uniqueness.
- const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
- _, err := db.Exec(ctx, q)
- if err != nil {
- log.Error(ctx, err)
- }
- case <-ctx.Done():
- ticker.Stop()
- return
- }
- }
-}
-*/
-
// finalizeTxWait calls FinalizeTx and then waits for confirmation of
// the transaction. A nil error return means the transaction is
// confirmed on the blockchain. ErrRejected means a conflicting tx is
generatorHeight := localHeight
log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
- // Remember this height in case we retry this submit call.
- /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
- if err != nil {
- return errors.Wrap(err, "saving tx submitted height")
- }*/
err := txbuilder.FinalizeTx(ctx, a.chain, txTemplate.Transaction)
if err != nil {
// might still be in pool or might be rejected; we can't
// tell definitively until its max time elapses.
-
// Re-insert into the pool in case it was dropped.
err = txbuilder.FinalizeTx(ctx, a.chain, tx)
if err != nil {
package txfeed
import (
- // "bytes"
"context"
- //"database/sql"
+ "encoding/json"
+ "strconv"
+ "strings"
- // "github.com/bytom/blockchain/query"
- // "github.com/bytom/database/pg"
+ log "github.com/sirupsen/logrus"
+ dbm "github.com/tendermint/tmlibs/db"
+
+ "github.com/bytom/blockchain/query"
"github.com/bytom/errors"
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/legacy"
+ "github.com/bytom/protocol/vm/vmutil"
+)
+
+const (
+ //FilterNumMax max txfeed filter amount.
+ FilterNumMax = 1024
)
-var ErrDuplicateAlias = errors.New("duplicate feed alias")
+var (
+ //ErrDuplicateAlias means error of duplicate feed alias.
+ ErrDuplicateAlias = errors.New("duplicate feed alias")
+ //ErrEmptyAlias means error of empty feed alias.
+ ErrEmptyAlias = errors.New("empty feed alias")
+ //ErrNumExceedlimit means txfeed filter number exceeds the limit.
+ ErrNumExceedlimit = errors.New("txfeed exceed limit")
+ maxNewTxfeedChSize = 1000
+)
+//Tracker filter tracker object.
type Tracker struct {
- DB string
+ DB dbm.DB
+ TxFeeds []*TxFeed
+ chain *protocol.Chain
+ txfeedCh chan *legacy.Tx
+}
+
+type rawOutput struct {
+ OutputID bc.Hash
+ bc.AssetAmount
+ ControlProgram []byte
+ txHash bc.Hash
+ outputIndex uint32
+ sourceID bc.Hash
+ sourcePos uint64
+ refData bc.Hash
}
+//TxFeed describe a filter
type TxFeed struct {
- ID string `json:"id,omitempty"`
- Alias *string `json:"alias"`
- Filter string `json:"filter,omitempty"`
- After string `json:"after,omitempty"`
+ ID string `json:"id,omitempty"`
+ Alias string `json:"alias"`
+ Filter string `json:"filter,omitempty"`
+ Param filter `json:"param,omitempty"`
}
-func (t *Tracker) Create(ctx context.Context, alias, fil, after string, clientToken string) (*TxFeed, error) {
- // Validate the filter.
- /* err := query.ValidateTransactionFilter(fil)
+type filter struct {
+ assetID string `json:"assetid,omitempty"`
+ amountLowerLimit uint64 `json:"lowerlimit,omitempty"`
+ amountUpperLimit uint64 `json:"upperlimit,omitempty"`
+ transType string `json:"transtype,omitempty"`
+}
+
+//NewTracker create new txfeed tracker.
+func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
+ s := &Tracker{
+ DB: db,
+ TxFeeds: make([]*TxFeed, 0, 10),
+ chain: chain,
+ txfeedCh: make(chan *legacy.Tx, maxNewTxfeedChSize),
+ }
+
+ return s
+}
+
+func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
+ iter := db.Iterator()
+ defer iter.Release()
+
+ for iter.Next() {
+ txFeed := &TxFeed{}
+ if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
+ return nil, err
+ }
+ filter, err := parseFilter(txFeed.Filter)
if err != nil {
return nil, err
}
+ txFeed.Param = filter
+ txFeeds = append(txFeeds, txFeed)
+ }
+ return txFeeds, nil
+}
- var ptrAlias *string
- if alias != "" {
- ptrAlias = &alias
- }
+func parseFilter(ft string) (filter, error) {
+ var res filter
- feed := &TxFeed{
- Alias: ptrAlias,
- Filter: fil,
- After: after,
+ subFilter := strings.Split(ft, "AND")
+ for _, value := range subFilter {
+ param := getParam(value, "=")
+ if param == "" {
+ continue
+ }
+ if strings.Contains(value, "asset_id") {
+ res.assetID = param
+ }
+ if strings.Contains(value, "amount_lower_limit") {
+ tmp, _ := strconv.ParseInt(param, 10, 64)
+ res.amountLowerLimit = uint64(tmp)
+ }
+ if strings.Contains(value, "amount_upper_limit") {
+ tmp, _ := strconv.ParseInt(param, 10, 64)
+ res.amountUpperLimit = uint64(tmp)
+ }
+ if strings.Contains(value, "trans_type") {
+ res.transType = param
}
- return insertTxFeed(ctx, t.DB, feed, clientToken)
+ }
+ return res, nil
+}
- */
- return nil, nil
+//TODO
+func getParam(str, substr string) string {
+ if result := strings.Index(str, substr); result >= 0 {
+ str := strings.Replace(str[result+1:], "'", "", -1)
+ str = strings.Replace(str, " ", "", -1)
+ return str
+ }
+ return ""
}
-// insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
-// and there already exists a txfeed with that client token, insertTxFeed will
-// lookup and return the existing txfeed instead.
+func parseTxfeed(db dbm.DB, filters []filter) error {
+ var txFeed TxFeed
+ var index int
-func insertTxFeed(ctx context.Context /* db pg.DB,*/, feed *TxFeed, clientToken string) (*TxFeed, error) {
- /* const q = `
- INSERT INTO txfeeds (alias, filter, after, client_token)
- VALUES ($1, $2, $3, $4)
- ON CONFLICT (client_token) DO NOTHING
- RETURNING id
- `
+ iter := db.Iterator()
+ defer iter.Release()
- var alias sql.NullString
- if feed.Alias != nil {
- alias = sql.NullString{Valid: true, String: *feed.Alias}
- }
+ for iter.Next() {
- nullToken := sql.NullString{
- String: clientToken,
- Valid: clientToken != "",
+ if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
+ return err
}
- err := db.QueryRowContext(
- ctx, q, alias, feed.Filter, feed.After,
- nullToken).Scan(&feed.ID)
-
- if pg.IsUniqueViolation(err) {
- return nil, errors.WithDetail(ErrDuplicateAlias, "a transaction feed with the provided alias already exists")
- } else if err == sql.ErrNoRows && clientToken != "" {
- // There is already a txfeed with the provided client
- // token. We should return the existing txfeed
- feed, err = txfeedByClientToken(ctx, db, clientToken)
- if err != nil {
- return nil, errors.Wrap(err, "retrieving existing txfeed")
+ subFilter := strings.Split(txFeed.Filter, "AND")
+ for _, value := range subFilter {
+ param := getParam(value, "=")
+ if param == "" {
+ continue
+ }
+ if strings.Contains(value, "asset_id") {
+ filters[index].assetID = param
+ }
+ if strings.Contains(value, "amount_lower_limit") {
+ tmp, _ := strconv.ParseInt(param, 10, 64)
+ filters[index].amountLowerLimit = uint64(tmp)
+ }
+ if strings.Contains(value, "amount_upper_limit") {
+ tmp, _ := strconv.ParseInt(param, 10, 64)
+ filters[index].amountUpperLimit = uint64(tmp)
+ }
+ if strings.Contains(value, "trans_type") {
+ filters[index].transType = param
}
- } else if err != nil {
- return nil, err
}
- */
- // return feed, nil
- return nil, nil
+ index++
+ }
+ return nil
}
-func txfeedByClientToken(ctx context.Context /* db pg.DB,*/, clientToken string) (*TxFeed, error) {
- /* const q = `
- SELECT id, alias, filter, after
- FROM txfeeds
- WHERE client_token=$1
- `
-
- var (
- feed TxFeed
- alias sql.NullString
- )
- err := db.QueryRowContext(ctx, q, clientToken).Scan(&feed.ID, &alias, &feed.Filter, &feed.After)
- if err != nil {
- return nil, err
- }
+//Prepare load and parse filters.
+func (t *Tracker) Prepare(ctx context.Context) error {
+ var err error
+ t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
+ return err
+}
- if alias.Valid {
- feed.Alias = &alias.String
- }
- */
- // return &feed, nil
- return nil, nil
+//GetTxfeedCh return a txfeed channel.
+func (t *Tracker) GetTxfeedCh() chan *legacy.Tx {
+ return t.txfeedCh
}
-func (t *Tracker) Find(ctx context.Context, id, alias string) (*TxFeed, error) {
- /* var q bytes.Buffer
+//Create create a txfeed filter.
+func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
+ // Validate the filter.
- q.WriteString(`
- SELECT id, alias, filter, after
- FROM txfeeds
- WHERE
- `)
+ if err := query.ValidateTransactionFilter(fil); err != nil {
+ return err
+ }
- if id != "" {
- q.WriteString(`id=$1`)
- } else {
- q.WriteString(`alias=$1`)
- id = alias
- }
+ if alias == "" {
+ return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
+ }
- var (
- feed TxFeed
- sqlAlias sql.NullString
- )
+ if len(t.TxFeeds) >= FilterNumMax {
+ return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
+ }
- err := t.DB.QueryRowContext(ctx, q.String(), id).Scan(&feed.ID, &sqlAlias, &feed.Filter, &feed.After)
- if err == sql.ErrNoRows {
- err = errors.Sub(pg.ErrUserInputNotFound, err)
- err = errors.WithDetailf(err, "alias: %s", alias)
- return nil, err
- }
- if err != nil {
- return nil, err
+ for _, txfeed := range t.TxFeeds {
+ if txfeed.Alias == alias {
+ return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
}
+ }
+
+ feed := &TxFeed{
+ Alias: alias,
+ Filter: fil,
+ }
+
+ filter, err := parseFilter(feed.Filter)
+ if err != nil {
+ return err
+ }
+ feed.Param = filter
+ t.TxFeeds = append(t.TxFeeds, feed)
+ return insertTxFeed(t.DB, feed)
+}
- if sqlAlias.Valid {
- feed.Alias = &sqlAlias.String
- }
- */
- // return &feed, nil
- return nil, nil
+func deleteTxFeed(db dbm.DB, alias string) error {
+ key, err := json.Marshal(alias)
+ if err != nil {
+ return err
+ }
+ db.Delete(key)
+ return nil
}
-func (t *Tracker) Delete(ctx context.Context, id, alias string) error {
- /* var q bytes.Buffer
+// insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
+// and there already exists a txfeed with that client token, insertTxFeed will
+// lookup and return the existing txfeed instead.
+func insertTxFeed(db dbm.DB, feed *TxFeed) error {
+ // var err error
+ key, err := json.Marshal(feed.Alias)
+ if err != nil {
+ return err
+ }
+ value, err := json.Marshal(feed)
+ if err != nil {
+ return err
+ }
+
+ db.Set(key, value)
+ return nil
+}
- q.WriteString(`DELETE FROM txfeeds WHERE `)
+//Get get txfeed filter with alias.
+func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
+ if alias == "" {
+ return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
+ }
- if id != "" {
- q.WriteString(`id=$1`)
- } else {
- q.WriteString(`alias=$1`)
- id = alias
+ for i, v := range t.TxFeeds {
+ if v.Alias == alias {
+ return t.TxFeeds[i], nil
}
+ }
+ return nil, nil
+}
- res, err := t.DB.ExecContext(ctx, q.String(), id)
- if err != nil {
- return err
- }
+//Delete delete txfeed with alias.
+func (t *Tracker) Delete(ctx context.Context, alias string) error {
+ log.WithField("delete", alias).Info("delete txfeed")
- affected, err := res.RowsAffected()
- if err != nil {
- return err
- }
+ if alias == "" {
+ return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
+ }
- if affected == 0 {
- return errors.WithDetailf(pg.ErrUserInputNotFound, "could not find and delete txfeed with id/alias=%s", id)
+ for i, txfeed := range t.TxFeeds {
+ if txfeed.Alias == alias {
+ t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
+ return deleteTxFeed(t.DB, alias)
}
- */
+ }
return nil
}
-func (t *Tracker) Update(ctx context.Context, id, alias, after, prev string) (*TxFeed, error) {
- /* var q bytes.Buffer
-
- q.WriteString(`UPDATE txfeeds SET after=$1 WHERE `)
+func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
+ assetidstr := value.AssetID.String()
+
+ if 0 != strings.Compare(txfeed.Param.assetID, assetidstr) && txfeed.Param.assetID != "" {
+ return false
+ }
+ if 0 != strings.Compare(txfeed.Param.transType, value.Type) && txfeed.Param.transType != "" {
+ return false
+ }
+ if txfeed.Param.amountLowerLimit > value.Amount && txfeed.Param.amountLowerLimit != 0 {
+ return false
+ }
+ if txfeed.Param.amountUpperLimit < value.Amount && txfeed.Param.amountUpperLimit != 0 {
+ return false
+ }
+
+ return true
+}
- if id != "" {
- q.WriteString(`id=$2`)
- } else {
- q.WriteString(`alias=$2`)
- id = alias
+//TxFilter filter tx from mempool.
+func (t *Tracker) TxFilter(tx *legacy.Tx) error {
+ var annotatedTx *query.AnnotatedTx
+ // Build the fully annotated transaction.
+ annotatedTx = buildAnnotatedTransaction(tx)
+ for _, output := range annotatedTx.Outputs {
+ for _, filter := range t.TxFeeds {
+ if match := outputFilter(filter, output); !match {
+ continue
+ }
+ b, err := json.Marshal(annotatedTx)
+ if err != nil {
+ return err
+ }
+ log.WithField("filter", string(b)).Info("find new tx match filter")
+ t.txfeedCh <- tx
}
+ }
+ return nil
+}
- q.WriteString(` AND after=$3`)
+var emptyJSONObject = json.RawMessage(`{}`)
+
+func buildAnnotatedTransaction(orig *legacy.Tx) *query.AnnotatedTx {
+ tx := &query.AnnotatedTx{
+ ID: orig.ID,
+ Inputs: make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
+ Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
+ }
+
+ for i := range orig.Inputs {
+ tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
+ }
+ for i := range orig.Outputs {
+ tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
+ }
+ return tx
+}
- res, err := t.DB.ExecContext(ctx, q.String(), after, id, prev)
- if err != nil {
- return nil, err
- }
+func buildAnnotatedInput(tx *legacy.Tx, i uint32) *query.AnnotatedInput {
+ orig := tx.Inputs[i]
+ in := &query.AnnotatedInput{
+ AssetID: orig.AssetID(),
+ Amount: orig.Amount(),
+ AssetDefinition: &emptyJSONObject,
+ AssetTags: &emptyJSONObject,
+ ReferenceData: &emptyJSONObject,
+ }
+
+ id := tx.Tx.InputIDs[i]
+ e := tx.Entries[id]
+ switch e := e.(type) {
+ case *bc.Spend:
+ in.Type = "spend"
+ in.ControlProgram = orig.ControlProgram()
+ in.SpentOutputID = e.SpentOutputId
+ case *bc.Issuance:
+ in.Type = "issue"
+ in.IssuanceProgram = orig.IssuanceProgram()
+ }
+
+ return in
+}
- affected, err := res.RowsAffected()
- if err != nil {
- return nil, err
- }
+func buildAnnotatedOutput(tx *legacy.Tx, idx int) *query.AnnotatedOutput {
+ orig := tx.Outputs[idx]
+ outid := tx.OutputID(idx)
+ out := &query.AnnotatedOutput{
+ OutputID: *outid,
+ Position: idx,
+ AssetID: *orig.AssetId,
+ AssetDefinition: &emptyJSONObject,
+ AssetTags: &emptyJSONObject,
+ Amount: orig.Amount,
+ ControlProgram: orig.ControlProgram,
+ ReferenceData: &emptyJSONObject,
+ }
+
+ if vmutil.IsUnspendable(out.ControlProgram) {
+ out.Type = "retire"
+ } else {
+ out.Type = "control"
+ }
+ return out
+}
- if affected == 0 {
- return nil, errors.WithDetailf(pg.ErrUserInputNotFound, "could not find txfeed with id/alias=%s and prev=%s", id, prev)
+// localAnnotator depends on the asset and account annotators and
+// must be run after them.
+func localAnnotator(tx *query.AnnotatedTx) {
+ for _, in := range tx.Inputs {
+ if in.AccountID != "" {
+ tx.IsLocal, in.IsLocal = true, true
+ }
+ if in.Type == "issue" && in.AssetIsLocal {
+ tx.IsLocal, in.IsLocal = true, true
}
+ }
- return &TxFeed{
- ID: id,
- Alias: &alias,
- After: after,
- }, nil
- */
- /* return &TxFeed{
- ID: nil,
- Alias nil,
- After nil,
+ for _, out := range tx.Outputs {
+ if out.AccountID != "" {
+ tx.IsLocal, out.IsLocal = true, true
}
- */return nil, nil
+ }
}
import (
"context"
+ "encoding/json"
+
+ log "github.com/sirupsen/logrus"
"github.com/bytom/blockchain/query"
"github.com/bytom/blockchain/txfeed"
- "github.com/bytom/errors"
- "github.com/bytom/net/http/httpjson"
)
// POST /create-txfeed
-func (a *BlockchainReactor) createTxFeed(ctx context.Context, in struct {
+func (bcr *BlockchainReactor) createTxFeed(ctx context.Context, in struct {
Alias string
Filter string
+}) error {
+ if err := bcr.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+ log.WithField("error", err).Error("Add TxFeed Failed")
+ return err
+ }
+ return nil
+}
+
+func (bcr *BlockchainReactor) getTxFeedByAlias(ctx context.Context, filter string) ([]*txfeed.TxFeed, error) {
+ txFeed := &txfeed.TxFeed{}
+ txFeeds := []*txfeed.TxFeed{}
+
+ jf, err := json.Marshal(filter)
+ if err != nil {
+ return nil, err
+ }
+
+ value := bcr.txFeedTracker.DB.Get(jf)
+ if value == nil {
+ return nil, nil
+ }
- // ClientToken is the application's unique token for the txfeed. Every txfeed
- // should have a unique client token. The client token is used to ensure
- // idempotency of create txfeed requests. Duplicate create txfeed requests
- // with the same client_token will only create one txfeed.
- ClientToken string `json:"client_token"`
-}) (*txfeed.TxFeed, error) {
- // after := fmt.Sprintf("%d:%d-%d", a.chain.Height(), math.MaxInt32, uint64(math.MaxInt64))
- // return a.txFeeds.Create(ctx, in.Alias, in.Filter, after, in.ClientToken)
- return nil, nil
+ if err := json.Unmarshal(value, txFeed); err != nil {
+ return nil, err
+ }
+ txFeeds = append(txFeeds, txFeed)
+ return txFeeds, nil
}
// POST /get-transaction-feed
-func (a *BlockchainReactor) getTxFeed(ctx context.Context, in struct {
- ID string `json:"id,omitempty"`
- Alias string `json:"alias,omitempty"`
-}) (*txfeed.TxFeed, error) {
- // return a.txFeeds.Find(ctx, in.ID, in.Alias)
- return nil, nil
+func (bcr *BlockchainReactor) getTxFeed(ctx context.Context, in requestQuery) interface{} {
+ txfeeds, err := bcr.getTxFeedByAlias(ctx, in.Filter)
+ if err != nil {
+ return err
+ }
+ return txfeeds
+
}
// POST /delete-transaction-feed
-func (a *BlockchainReactor) deleteTxFeed(ctx context.Context, in struct {
- ID string `json:"id,omitempty"`
+func (bcr *BlockchainReactor) deleteTxFeed(ctx context.Context, in struct {
Alias string `json:"alias,omitempty"`
}) error {
- // return a.txFeeds.Delete(ctx, in.ID, in.Alias)
- return nil
+ return bcr.txFeedTracker.Delete(ctx, in.Alias)
}
// POST /update-transaction-feed
-func (a *BlockchainReactor) updateTxFeed(ctx context.Context, in struct {
- ID string `json:"id,omitempty"`
- Alias string `json:"alias,omitempty"`
- Prev string `json:"previous_after"`
- After string `json:"after"`
-}) (*txfeed.TxFeed, error) {
- // TODO(tessr): Consider moving this function into the txfeed package.
- // (It's currently outside the txfeed package to avoid a dependecy cycle
- // between txfeed and query.)
- bad, err := txAfterIsBefore(in.After, in.Prev)
- if err != nil {
- return nil, err
- }
-
- if bad {
- return nil, errors.WithDetail(httpjson.ErrBadRequest, "new After cannot be before Prev")
+func (bcr *BlockchainReactor) updateTxFeed(ctx context.Context, in struct {
+ Alias string
+ Filter string
+}) error {
+ if err := bcr.txFeedTracker.Delete(ctx, in.Alias); err != nil {
+ return err
}
- // return a.txFeeds.Update(ctx, in.ID, in.Alias, in.After, in.Prev)
- return nil, nil
+ return bcr.txFeedTracker.Create(ctx, in.Alias, in.Filter)
}
// txAfterIsBefore returns true if a is before b. It returns an error if either
(aAfter.FromBlockHeight == bAfter.FromBlockHeight &&
aAfter.FromPosition < bAfter.FromPosition), nil
}
+
+func (bcr *BlockchainReactor) getTxFeeds() ([]*txfeed.TxFeed, error) {
+ txFeeds := make([]*txfeed.TxFeed, 0)
+ iter := bcr.txFeedTracker.DB.Iterator()
+ defer iter.Release()
+
+ for iter.Next() {
+ txFeed := &txfeed.TxFeed{}
+ if err := json.Unmarshal(iter.Value(), txFeed); err != nil {
+ return nil, err
+ }
+ txFeeds = append(txFeeds, txFeed)
+ }
+ return txFeeds, nil
+}
+
+// listTxFeeds is an http handler for listing txfeeds. It does not take a filter.
+// POST /list-transaction-feeds
+func (bcr *BlockchainReactor) listTxFeeds(ctx context.Context, in requestQuery) interface{} {
+ txfeeds, err := bcr.getTxFeeds()
+ if err != nil {
+ return err
+ }
+ return txfeeds
+}
}
func createTxFeed(client *rpc.Client, args []string) {
- if len(args) != 1 {
- fatalln("error:createTxFeed take no arguments")
+ if len(args) != 2 {
+ fatalln("error:createTxFeed need arguments")
}
type In struct {
- Alias string
- Filter string
- ClientToken string `json:"client_token"`
+ Alias string
+ Filter string
}
var in In
- in.Alias = "asdfgh"
- in.Filter = "zxcvbn"
- in.ClientToken = args[0]
- client.Call(context.Background(), "/create-transaction-feed", &[]In{in}, nil)
+ in.Alias = args[0]
+ in.Filter = args[1]
+
+ client.Call(context.Background(), "/create-transaction-feed", in, nil)
}
func getTxFeed(client *rpc.Client, args []string) {
- if len(args) != 0 {
- fatalln("error:getTxFeed not use args")
+ if len(args) != 1 {
+ fatalln("error:getTxFeed use args alias")
}
- type In struct {
- ID string `json:"id,omitempty"`
- Alias string `json:"alias,omitempty"`
+ type requestQuery struct {
+ Filter string `json:"filter,omitempty"`
+ FilterParams []interface{} `json:"filter_params,omitempty"`
+ SumBy []string `json:"sum_by,omitempty"`
+ PageSize int `json:"page_size"`
+ AscLongPoll bool `json:"ascending_with_long_poll,omitempty"`
+ Timeout json.Duration `json:"timeout"`
+ After string `json:"after"`
+ StartTimeMS uint64 `json:"start_time,omitempty"`
+ EndTimeMS uint64 `json:"end_time,omitempty"`
+ TimestampMS uint64 `json:"timestamp,omitempty"`
+ Type string `json:"type"`
+ Aliases []string `json:"aliases,omitempty"`
+ }
+ var in requestQuery
+ in.Filter = args[0]
+ responses := make([]interface{}, 0)
+ client.Call(context.Background(), "/get-transaction-feed", in, &responses)
+ if len(responses) > 0 {
+ for i, item := range responses {
+ fmt.Println(i, "-----", item)
+ }
}
- var in In
- in.Alias = "qwerty"
- in.ID = "123456"
- client.Call(context.Background(), "/get-transaction-feed", &[]In{in}, nil)
}
func updateTxFeed(client *rpc.Client, args []string) {
- if len(args) != 0 {
- fatalln("error:updateTxFeed not use args")
+ if len(args) != 2 {
+ fatalln("error:createTxFeed need arguments")
}
type In struct {
- ID string `json:"id,omitempty"`
- Alias string `json:"alias,omitempty"`
+ Alias string
+ Filter string
}
var in In
- in.ID = "123456"
- in.Alias = "qwerty"
- client.Call(context.Background(), "/update-transaction-feed", &[]In{in}, nil)
+ in.Alias = args[0]
+ in.Filter = args[1]
+ client.Call(context.Background(), "/update-transaction-feed", in, nil)
}
func deleteTxFeed(client *rpc.Client, args []string) {
- if len(args) != 0 {
- fatalln("error:deleteTxFeed not use args")
+ if len(args) != 1 {
+ fatalln("error:deleteTxFeed use args alias")
}
type In struct {
- ID string `json:"id,omitempty"`
Alias string `json:"alias,omitempty"`
}
var in In
- in.ID = "123456"
- in.Alias = "qwerty"
- client.Call(context.Background(), "/delete-transaction-feed", &[]In{in}, nil)
+ in.Alias = args[0]
+ client.Call(context.Background(), "/delete-transaction-feed", in, nil)
}
func listAccounts(client *rpc.Client, args []string) {
responses := make([]interface{}, 0)
client.Call(context.Background(), "/list-accounts", in, &responses)
- if len(responses) > 0 {
- for i, item := range responses {
- fmt.Println(i, "-----", item)
- }
- }
+ // if len(responses) > 0 {
+ // for i, item := range responses {
+ // fmt.Println(i, "-----", item)
+ // }
+ // }
}
func listAssets(client *rpc.Client, args []string) {
}
func listTxFeeds(client *rpc.Client, args []string) {
+ fmt.Println("listTxFeeds")
if len(args) != 0 {
fatalln("error:listTxFeeds not use args")
}
Aliases []string `json:"aliases,omitempty"`
}
var in requestQuery
- after := in.After
- out := in
- out.After = after
- client.Call(context.Background(), "/list-transactions-feeds", &[]requestQuery{in}, nil)
+
+ responses := make([]interface{}, 0)
+
+ client.Call(context.Background(), "/list-transaction-feeds", in, &responses)
+ if len(responses) > 0 {
+ for i, item := range responses {
+ fmt.Println(i, "-----", item)
+ }
+ }
+
}
func listTransactions(client *rpc.Client, args []string) {
"github.com/bytom/blockchain/pin"
"github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/txdb"
+ "github.com/bytom/blockchain/txfeed"
cfg "github.com/bytom/config"
"github.com/bytom/consensus"
"github.com/bytom/env"
var accounts *account.Manager = nil
var assets *asset.Registry = nil
var pinStore *pin.Store = nil
+ var txFeed *txfeed.Tracker = nil
if config.Wallet.Enable {
accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
assets = asset.NewRegistry(assetsDB, chain)
+
+ txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
+ txFeed = txfeed.NewTracker(txFeedDB, chain)
+
+ if err = txFeed.Prepare(ctx); err != nil {
+ log.WithField("error", err).Error("start txfeed")
+ return nil
+ }
+
}
//Todo HSM
/*
if err != nil {
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}
- bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore)
+ bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore, txFeed)
sw.AddReactor("BLOCKCHAIN", bcReactor)