m.Handle("/get-transaction", jsonHandler(a.getTransaction))
m.Handle("/list-transactions", jsonHandler(a.listTransactions))
+ m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
+ m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+
m.Handle("/list-balances", jsonHandler(a.listBalances))
m.Handle("/list-unspent-outputs", jsonHandler(a.listUnspentOutputs))
m.Handle("/submit-transaction", jsonHandler(a.submit))
m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas))
- m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
- m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+ m.Handle("/get-mempool-transaction", jsonHandler(a.getMemPoolTx))
+ m.Handle("/list-mempool-transactions", jsonHandler(a.listMemPoolTxs))
m.Handle("/decode-raw-transaction", jsonHandler(a.decodeRawTransaction))
m.Handle("/get-block-hash", jsonHandler(a.getBestBlockHash))
}
// POST /get-unconfirmed-transaction
-func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
+func (a *API) getUnconfirmedTx(ctx context.Context, txInfo struct {
+ TxID string `json:"tx_id"`
+}) Response {
+ transaction, err := a.wallet.GetUnconfirmedTxByTxID(txInfo.TxID)
+ if err != nil {
+ log.Errorf("getTransaction error: %v", err)
+ return NewErrorResponse(err)
+ }
+
+ return NewSuccessResponse(transaction)
+}
+
+// POST /list-unconfirmed-transactions
+func (a *API) listUnconfirmedTxs(ctx context.Context, filter struct {
+ AccountID string `json:"account_id"`
+}) Response {
+ transactions := []*query.AnnotatedTx{}
+ var err error
+
+ transactions, err = a.wallet.GetUnconfirmedTxsByAccountID(filter.AccountID)
+ if err != nil {
+ log.Errorf("listTransactions: %v", err)
+ return NewErrorResponse(err)
+ }
+
+ return NewSuccessResponse(transactions)
+}
+
+// POST /get-mempool-transaction
+func (a *API) getMemPoolTx(ctx context.Context, filter struct {
TxID chainjson.HexBytes `json:"tx_id"`
}) Response {
var tmpTxID [32]byte
TxIDs []bc.Hash `json:"tx_ids"`
}
-// POST /list-unconfirmed-transactions
-func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
+// POST /list-mempool-transactions
+func (a *API) listMemPoolTxs(ctx context.Context) Response {
txIDs := []bc.Hash{}
txPool := a.chain.GetTxPool()
BytomcliCmd.AddCommand(getUnconfirmedTransactionCmd)
BytomcliCmd.AddCommand(listUnconfirmedTransactionsCmd)
+
+ BytomcliCmd.AddCommand(getMemPoolTransactionCmd)
+ BytomcliCmd.AddCommand(listMemPoolTransactionsCmd)
BytomcliCmd.AddCommand(decodeRawTransactionCmd)
BytomcliCmd.AddCommand(listUnspentOutputsCmd)
listTransactionsCmd.PersistentFlags().StringVar(&txID, "id", "", "transaction id")
listTransactionsCmd.PersistentFlags().StringVar(&account, "account_id", "", "account id")
listTransactionsCmd.PersistentFlags().BoolVar(&detail, "detail", false, "list transactions details")
+
+ listUnconfirmedTransactionsCmd.PersistentFlags().StringVar(&account, "account_id", "", "account id")
}
var (
Short: "list unconfirmed transactions hashes",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
- data, exitCode := util.ClientCall("/list-unconfirmed-transactions")
+ filter := struct {
+ AccountID string `json:"account_id"`
+ }{AccountID: account}
+
+ data, exitCode := util.ClientCall("/list-unconfirmed-transactions", &filter)
+ if exitCode != util.Success {
+ os.Exit(exitCode)
+ }
+
+ printJSON(data)
+ },
+}
+
+var getMemPoolTransactionCmd = &cobra.Command{
+ Use: "get-mempool-transaction <hash>",
+ Short: "get mempool transaction by matching the given transaction hash",
+ Args: cobra.ExactArgs(1),
+ Run: func(cmd *cobra.Command, args []string) {
+ txID, err := hex.DecodeString(args[0])
+ if err != nil {
+ jww.ERROR.Println(err)
+ os.Exit(util.ErrLocalExe)
+ }
+
+ txInfo := &struct {
+ TxID chainjson.HexBytes `json:"tx_id"`
+ }{TxID: txID}
+
+ data, exitCode := util.ClientCall("/get-mempool-transaction", txInfo)
+ if exitCode != util.Success {
+ os.Exit(exitCode)
+ }
+
+ printJSON(data)
+ },
+}
+
+var listMemPoolTransactionsCmd = &cobra.Command{
+ Use: "list-mempool-transactions",
+ Short: "list mempool transactions hashes",
+ Args: cobra.NoArgs,
+ Run: func(cmd *cobra.Command, args []string) {
+ data, exitCode := util.ClientCall("/list-mempool-transactions")
if exitCode != util.Success {
os.Exit(exitCode)
}
maxBlocksPending = 1024
maxtxsPending = 32768
maxQuitReq = 256
+
+ // txChanSize is the size of channel listening to Txpool newTxCh.
+ maxTxChanSize = 1000
)
var (
"github.com/bytom/p2p/pex"
core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
"github.com/bytom/version"
)
blockKeeper *blockKeeper
peers *peerSet
+ txCh chan *types.Tx
newBlockCh chan *bc.Hash
newPeerCh chan struct{}
txSyncCh chan *txsync
txPool: txPool,
chain: chain,
privKey: crypto.GenPrivKeyEd25519(),
- config: config,
- quitSync: make(chan struct{}),
+ peers: newPeerSet(),
+ txCh: make(chan *types.Tx, maxTxChanSize),
newBlockCh: newBlockCh,
newPeerCh: make(chan struct{}),
txSyncCh: make(chan *txsync),
dropPeerCh: make(chan *string, maxQuitReq),
- peers: newPeerSet(),
+ quitSync: make(chan struct{}),
+ config: config,
}
trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
}
func (sm *SyncManager) txBroadcastLoop() {
- newTxCh := sm.txPool.GetNewTxCh()
for {
select {
- case newTx := <-newTxCh:
+ case newTx := <-sm.txCh:
peers, err := sm.peers.BroadcastTx(newTx)
if err != nil {
log.Errorf("Broadcast new tx error. %v", err)
func (sm *SyncManager) Switch() *p2p.Switch {
return sm.sw
}
+
+//SetTxCh set SyncManager txCh
+func (sm *SyncManager) SetTxCh(txCh *types.Tx) {
+ sm.txCh <- txCh
+}
newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
+ go func() {
+ newTxCh := txPool.GetNewTxCh()
+ for {
+ select {
+ case newTx := <-newTxCh:
+ syncManager.SetTxCh(newTx)
+ if wallet != nil {
+ wallet.SetTxCh(newTx)
+ }
+ default:
+ }
+ }
+ }()
// run the profile server
profileHost := config.ProfListenAddress
--- /dev/null
+package wallet
+
+import (
+ "encoding/json"
+
+ log "github.com/sirupsen/logrus"
+
+ "fmt"
+ "github.com/bytom/blockchain/query"
+ "github.com/bytom/protocol/bc/types"
+)
+
+const (
+ //unconfirmedTxPrefix is txpool unconfirmed transactions prefix
+ unconfirmedTxPrefix = "UTXS:"
+)
+
+func calcUnconfirmedKey(formatKey string) []byte {
+ return []byte(unconfirmedTxPrefix + formatKey)
+}
+
+// SaveUnconfirmedTx save unconfirmed annotated transaction to the database
+func (w *Wallet) SaveUnconfirmedTx(tx *types.Tx) error {
+ annotatedTx := &query.AnnotatedTx{
+ ID: tx.ID,
+ Inputs: make([]*query.AnnotatedInput, 0, len(tx.Inputs)),
+ Outputs: make([]*query.AnnotatedOutput, 0, len(tx.Outputs)),
+ Size: tx.SerializedSize,
+ }
+
+ for i := range tx.Inputs {
+ annotatedTx.Inputs = append(annotatedTx.Inputs, w.BuildAnnotatedInput(tx, uint32(i)))
+ }
+ for i := range tx.Outputs {
+ annotatedTx.Outputs = append(annotatedTx.Outputs, w.BuildAnnotatedOutput(tx, i))
+ }
+
+ // annotate account and asset
+ annotatedTxs := []*query.AnnotatedTx{}
+ annotatedTxs = append(annotatedTxs, annotatedTx)
+ annotateTxsAccount(annotatedTxs, w.DB)
+ annotateTxsAsset(w, annotatedTxs)
+
+ rawTx, err := json.Marshal(annotatedTxs[0])
+ if err != nil {
+ log.WithField("err", err).Error("inserting unconfirmed annotated transaction to db")
+ return err
+ }
+
+ w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
+ return nil
+}
+
+// DeleteUnconfirmedTxs delete unconfirmed annotated transaction from the database
+func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) {
+ for _, txID := range txIDs {
+ if exist := w.DB.Get(calcUnconfirmedKey(txID)); exist != nil {
+ w.DB.Delete(calcUnconfirmedKey(txID))
+ }
+ }
+}
+
+// RescanWalletTxPool rescan txPool
+func (w *Wallet) RescanWalletTxPool() []string {
+ txIDs := []string{}
+
+ txPool := w.chain.GetTxPool()
+ txs := txPool.GetTransactions()
+ for _, txDesc := range txs {
+ txIDs = append(txIDs, txDesc.Tx.ID.String())
+ }
+
+ return txIDs
+}
+
+// GetUnconfirmedTxByTxID get unconfirmed transaction by txID
+func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
+ formatKey := w.DB.Get(calcUnconfirmedKey(txID))
+ if formatKey == nil {
+ return nil, fmt.Errorf("Not found unconfirmed transaction(tx_id=%s) ", txID)
+ }
+
+ annotatedTx := &query.AnnotatedTx{}
+ txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey)))
+ if err := json.Unmarshal(txInfo, annotatedTx); err != nil {
+ return nil, err
+ }
+
+ return annotatedTx, nil
+}
+
+// GetUnconfirmedTxsByAccountID get account unconfirmed txs by account ID
+func (w *Wallet) GetUnconfirmedTxsByAccountID(accountID string) ([]*query.AnnotatedTx, error) {
+ annotatedTxs := []*query.AnnotatedTx{}
+
+ txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
+ defer txIter.Release()
+ for txIter.Next() {
+ annotatedTx := &query.AnnotatedTx{}
+ if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
+ return nil, err
+ }
+
+ if accountID == "" {
+ annotatedTxs = append(annotatedTxs, annotatedTx)
+ continue
+ }
+
+ if findTransactionsByAccount(annotatedTx, accountID) {
+ annotatedTxs = append(annotatedTxs, annotatedTx)
+ }
+ }
+
+ return annotatedTxs, nil
+}
"github.com/bytom/protocol/bc/types"
)
-//SINGLE single sign
-const SINGLE = 1
+const (
+ //SINGLE single sign
+ SINGLE = 1
+
+ // txChanSize is the size of channel listening to Txpool newTxCh.
+ maxTxChanSize = 1000
+)
var walletKey = []byte("walletInfo")
Hsm *pseudohsm.HSM
chain *protocol.Chain
rescanCh chan struct{}
+ txCh chan *types.Tx
}
//NewWallet return a new wallet instance
chain: chain,
Hsm: hsm,
rescanCh: make(chan struct{}, 1),
+ txCh: make(chan *types.Tx, maxTxChanSize),
}
if err := w.loadWalletInfo(); err != nil {
log.WithField("err", err).Error("walletUpdater stop")
return
}
+
+ // rescan txpool transaction and delete unconfirmed transactions from database
+ txIDs := w.RescanWalletTxPool()
+ w.DeleteUnconfirmedTxs(txIDs)
}
}
+//RescanBlocks provide a trigger to rescan blocks
func (w *Wallet) RescanBlocks() {
select {
case w.rescanCh <- struct{}{}:
block, _ := w.chain.GetBlockByHeight(0)
w.status.WorkHash = bc.Hash{}
w.AttachBlock(block)
+ case newTx := <-w.txCh:
+ w.SaveUnconfirmedTx(newTx)
default:
return
}
}
return nil
}
+
+//SetTxCh set wallet txCh
+func (w *Wallet) SetTxCh(txCh *types.Tx) {
+ w.txCh <- txCh
+}