OSDN Git Service

add txCh for synmanager
authoroysheng <oysheng@bytom.io>
Thu, 31 May 2018 11:06:00 +0000 (19:06 +0800)
committeroysheng <oysheng@bytom.io>
Thu, 31 May 2018 11:06:00 +0000 (19:06 +0800)
add txCh for wallet
modify name get-unconfirmed-transaction to get-mempool-transaction
modify name list-unconfirmed-transactions to list-mempool-transactions
add new API get-unconfirmed-transaction and list-unconfirmed-transactions

api/api.go
api/query.go
cmd/bytomcli/commands/bytomcli.go
cmd/bytomcli/commands/transaction.go
netsync/block_keeper.go
netsync/handle.go
node/node.go
wallet/unconfirmed.go [new file with mode: 0644]
wallet/wallet.go

index 5a69318..461aabb 100644 (file)
@@ -192,6 +192,9 @@ func (a *API) buildHandler() {
                m.Handle("/get-transaction", jsonHandler(a.getTransaction))
                m.Handle("/list-transactions", jsonHandler(a.listTransactions))
 
+               m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
+               m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+
                m.Handle("/list-balances", jsonHandler(a.listBalances))
                m.Handle("/list-unspent-outputs", jsonHandler(a.listUnspentOutputs))
 
@@ -218,8 +221,8 @@ func (a *API) buildHandler() {
        m.Handle("/submit-transaction", jsonHandler(a.submit))
        m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas))
 
-       m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
-       m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+       m.Handle("/get-mempool-transaction", jsonHandler(a.getMemPoolTx))
+       m.Handle("/list-mempool-transactions", jsonHandler(a.listMemPoolTxs))
        m.Handle("/decode-raw-transaction", jsonHandler(a.decodeRawTransaction))
 
        m.Handle("/get-block-hash", jsonHandler(a.getBestBlockHash))
index 0a566ac..e28e5c5 100644 (file)
@@ -108,7 +108,36 @@ func (a *API) listTransactions(ctx context.Context, filter struct {
 }
 
 // POST /get-unconfirmed-transaction
-func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
+func (a *API) getUnconfirmedTx(ctx context.Context, txInfo struct {
+       TxID string `json:"tx_id"`
+}) Response {
+       transaction, err := a.wallet.GetUnconfirmedTxByTxID(txInfo.TxID)
+       if err != nil {
+               log.Errorf("getTransaction error: %v", err)
+               return NewErrorResponse(err)
+       }
+
+       return NewSuccessResponse(transaction)
+}
+
+// POST /list-unconfirmed-transactions
+func (a *API) listUnconfirmedTxs(ctx context.Context, filter struct {
+       AccountID string `json:"account_id"`
+}) Response {
+       transactions := []*query.AnnotatedTx{}
+       var err error
+
+       transactions, err = a.wallet.GetUnconfirmedTxsByAccountID(filter.AccountID)
+       if err != nil {
+               log.Errorf("listTransactions: %v", err)
+               return NewErrorResponse(err)
+       }
+
+       return NewSuccessResponse(transactions)
+}
+
+// POST /get-mempool-transaction
+func (a *API) getMemPoolTx(ctx context.Context, filter struct {
        TxID chainjson.HexBytes `json:"tx_id"`
 }) Response {
        var tmpTxID [32]byte
@@ -146,8 +175,8 @@ type unconfirmedTxsResp struct {
        TxIDs []bc.Hash `json:"tx_ids"`
 }
 
-// POST /list-unconfirmed-transactions
-func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
+// POST /list-mempool-transactions
+func (a *API) listMemPoolTxs(ctx context.Context) Response {
        txIDs := []bc.Hash{}
 
        txPool := a.chain.GetTxPool()
index dd1abf9..9f45d87 100644 (file)
@@ -128,6 +128,9 @@ func AddCommands() {
 
        BytomcliCmd.AddCommand(getUnconfirmedTransactionCmd)
        BytomcliCmd.AddCommand(listUnconfirmedTransactionsCmd)
+
+       BytomcliCmd.AddCommand(getMemPoolTransactionCmd)
+       BytomcliCmd.AddCommand(listMemPoolTransactionsCmd)
        BytomcliCmd.AddCommand(decodeRawTransactionCmd)
 
        BytomcliCmd.AddCommand(listUnspentOutputsCmd)
index 38b5101..7ac034b 100644 (file)
@@ -30,6 +30,8 @@ func init() {
        listTransactionsCmd.PersistentFlags().StringVar(&txID, "id", "", "transaction id")
        listTransactionsCmd.PersistentFlags().StringVar(&account, "account_id", "", "account id")
        listTransactionsCmd.PersistentFlags().BoolVar(&detail, "detail", false, "list transactions details")
+
+       listUnconfirmedTransactionsCmd.PersistentFlags().StringVar(&account, "account_id", "", "account id")
 }
 
 var (
@@ -366,7 +368,49 @@ var listUnconfirmedTransactionsCmd = &cobra.Command{
        Short: "list unconfirmed transactions hashes",
        Args:  cobra.NoArgs,
        Run: func(cmd *cobra.Command, args []string) {
-               data, exitCode := util.ClientCall("/list-unconfirmed-transactions")
+               filter := struct {
+                       AccountID string `json:"account_id"`
+               }{AccountID: account}
+
+               data, exitCode := util.ClientCall("/list-unconfirmed-transactions", &filter)
+               if exitCode != util.Success {
+                       os.Exit(exitCode)
+               }
+
+               printJSON(data)
+       },
+}
+
+var getMemPoolTransactionCmd = &cobra.Command{
+       Use:   "get-mempool-transaction <hash>",
+       Short: "get mempool transaction by matching the given transaction hash",
+       Args:  cobra.ExactArgs(1),
+       Run: func(cmd *cobra.Command, args []string) {
+               txID, err := hex.DecodeString(args[0])
+               if err != nil {
+                       jww.ERROR.Println(err)
+                       os.Exit(util.ErrLocalExe)
+               }
+
+               txInfo := &struct {
+                       TxID chainjson.HexBytes `json:"tx_id"`
+               }{TxID: txID}
+
+               data, exitCode := util.ClientCall("/get-mempool-transaction", txInfo)
+               if exitCode != util.Success {
+                       os.Exit(exitCode)
+               }
+
+               printJSON(data)
+       },
+}
+
+var listMemPoolTransactionsCmd = &cobra.Command{
+       Use:   "list-mempool-transactions",
+       Short: "list mempool transactions hashes",
+       Args:  cobra.NoArgs,
+       Run: func(cmd *cobra.Command, args []string) {
+               data, exitCode := util.ClientCall("/list-mempool-transactions")
                if exitCode != util.Success {
                        os.Exit(exitCode)
                }
index 5badaf2..dbb938a 100644 (file)
@@ -21,6 +21,9 @@ const (
        maxBlocksPending = 1024
        maxtxsPending    = 32768
        maxQuitReq       = 256
+
+       // txChanSize is the size of channel listening to Txpool newTxCh.
+       maxTxChanSize = 1000
 )
 
 var (
index ac1619b..40d7a47 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
        "github.com/bytom/version"
 )
 
@@ -29,6 +30,7 @@ type SyncManager struct {
        blockKeeper *blockKeeper
        peers       *peerSet
 
+       txCh          chan *types.Tx
        newBlockCh    chan *bc.Hash
        newPeerCh     chan struct{}
        txSyncCh      chan *txsync
@@ -45,13 +47,14 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
                txPool:     txPool,
                chain:      chain,
                privKey:    crypto.GenPrivKeyEd25519(),
-               config:     config,
-               quitSync:   make(chan struct{}),
+               peers:      newPeerSet(),
+               txCh:       make(chan *types.Tx, maxTxChanSize),
                newBlockCh: newBlockCh,
                newPeerCh:  make(chan struct{}),
                txSyncCh:   make(chan *txsync),
                dropPeerCh: make(chan *string, maxQuitReq),
-               peers:      newPeerSet(),
+               quitSync:   make(chan struct{}),
+               config:     config,
        }
 
        trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
@@ -146,10 +149,9 @@ func (sm *SyncManager) Stop() {
 }
 
 func (sm *SyncManager) txBroadcastLoop() {
-       newTxCh := sm.txPool.GetNewTxCh()
        for {
                select {
-               case newTx := <-newTxCh:
+               case newTx := <-sm.txCh:
                        peers, err := sm.peers.BroadcastTx(newTx)
                        if err != nil {
                                log.Errorf("Broadcast new tx error. %v", err)
@@ -216,3 +218,8 @@ func (sm *SyncManager) Peers() *peerSet {
 func (sm *SyncManager) Switch() *p2p.Switch {
        return sm.sw
 }
+
+//SetTxCh set SyncManager txCh
+func (sm *SyncManager) SetTxCh(txCh *types.Tx) {
+       sm.txCh <- txCh
+}
index b941475..f3a46f5 100644 (file)
@@ -124,6 +124,19 @@ func NewNode(config *cfg.Config) *Node {
        newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
        syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
+       go func() {
+               newTxCh := txPool.GetNewTxCh()
+               for {
+                       select {
+                       case newTx := <-newTxCh:
+                               syncManager.SetTxCh(newTx)
+                               if wallet != nil {
+                                       wallet.SetTxCh(newTx)
+                               }
+                       default:
+                       }
+               }
+       }()
 
        // run the profile server
        profileHost := config.ProfListenAddress
diff --git a/wallet/unconfirmed.go b/wallet/unconfirmed.go
new file mode 100644 (file)
index 0000000..26b81cf
--- /dev/null
@@ -0,0 +1,115 @@
+package wallet
+
+import (
+       "encoding/json"
+
+       log "github.com/sirupsen/logrus"
+
+       "fmt"
+       "github.com/bytom/blockchain/query"
+       "github.com/bytom/protocol/bc/types"
+)
+
+const (
+       //unconfirmedTxPrefix is txpool unconfirmed transactions prefix
+       unconfirmedTxPrefix = "UTXS:"
+)
+
+func calcUnconfirmedKey(formatKey string) []byte {
+       return []byte(unconfirmedTxPrefix + formatKey)
+}
+
+// SaveUnconfirmedTx save unconfirmed annotated transaction to the database
+func (w *Wallet) SaveUnconfirmedTx(tx *types.Tx) error {
+       annotatedTx := &query.AnnotatedTx{
+               ID:      tx.ID,
+               Inputs:  make([]*query.AnnotatedInput, 0, len(tx.Inputs)),
+               Outputs: make([]*query.AnnotatedOutput, 0, len(tx.Outputs)),
+               Size:    tx.SerializedSize,
+       }
+
+       for i := range tx.Inputs {
+               annotatedTx.Inputs = append(annotatedTx.Inputs, w.BuildAnnotatedInput(tx, uint32(i)))
+       }
+       for i := range tx.Outputs {
+               annotatedTx.Outputs = append(annotatedTx.Outputs, w.BuildAnnotatedOutput(tx, i))
+       }
+
+       // annotate account and asset
+       annotatedTxs := []*query.AnnotatedTx{}
+       annotatedTxs = append(annotatedTxs, annotatedTx)
+       annotateTxsAccount(annotatedTxs, w.DB)
+       annotateTxsAsset(w, annotatedTxs)
+
+       rawTx, err := json.Marshal(annotatedTxs[0])
+       if err != nil {
+               log.WithField("err", err).Error("inserting unconfirmed annotated transaction to db")
+               return err
+       }
+
+       w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
+       return nil
+}
+
+// DeleteUnconfirmedTxs delete unconfirmed annotated transaction from the database
+func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) {
+       for _, txID := range txIDs {
+               if exist := w.DB.Get(calcUnconfirmedKey(txID)); exist != nil {
+                       w.DB.Delete(calcUnconfirmedKey(txID))
+               }
+       }
+}
+
+// RescanWalletTxPool rescan txPool
+func (w *Wallet) RescanWalletTxPool() []string {
+       txIDs := []string{}
+
+       txPool := w.chain.GetTxPool()
+       txs := txPool.GetTransactions()
+       for _, txDesc := range txs {
+               txIDs = append(txIDs, txDesc.Tx.ID.String())
+       }
+
+       return txIDs
+}
+
+// GetUnconfirmedTxByTxID get unconfirmed transaction by txID
+func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
+       formatKey := w.DB.Get(calcUnconfirmedKey(txID))
+       if formatKey == nil {
+               return nil, fmt.Errorf("Not found unconfirmed transaction(tx_id=%s) ", txID)
+       }
+
+       annotatedTx := &query.AnnotatedTx{}
+       txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey)))
+       if err := json.Unmarshal(txInfo, annotatedTx); err != nil {
+               return nil, err
+       }
+
+       return annotatedTx, nil
+}
+
+// GetUnconfirmedTxsByAccountID get account unconfirmed txs by account ID
+func (w *Wallet) GetUnconfirmedTxsByAccountID(accountID string) ([]*query.AnnotatedTx, error) {
+       annotatedTxs := []*query.AnnotatedTx{}
+
+       txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
+       defer txIter.Release()
+       for txIter.Next() {
+               annotatedTx := &query.AnnotatedTx{}
+               if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
+                       return nil, err
+               }
+
+               if accountID == "" {
+                       annotatedTxs = append(annotatedTxs, annotatedTx)
+                       continue
+               }
+
+               if findTransactionsByAccount(annotatedTx, accountID) {
+                       annotatedTxs = append(annotatedTxs, annotatedTx)
+               }
+       }
+
+       return annotatedTxs, nil
+}
index b80cf1b..ce2ab80 100644 (file)
@@ -14,8 +14,13 @@ import (
        "github.com/bytom/protocol/bc/types"
 )
 
-//SINGLE single sign
-const SINGLE = 1
+const (
+       //SINGLE single sign
+       SINGLE = 1
+
+       // txChanSize is the size of channel listening to Txpool newTxCh.
+       maxTxChanSize = 1000
+)
 
 var walletKey = []byte("walletInfo")
 
@@ -36,6 +41,7 @@ type Wallet struct {
        Hsm        *pseudohsm.HSM
        chain      *protocol.Chain
        rescanCh   chan struct{}
+       txCh       chan *types.Tx
 }
 
 //NewWallet return a new wallet instance
@@ -47,6 +53,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
                chain:      chain,
                Hsm:        hsm,
                rescanCh:   make(chan struct{}, 1),
+               txCh:       make(chan *types.Tx, maxTxChanSize),
        }
 
        if err := w.loadWalletInfo(); err != nil {
@@ -160,9 +167,14 @@ func (w *Wallet) walletUpdater() {
                        log.WithField("err", err).Error("walletUpdater stop")
                        return
                }
+
+               // rescan txpool transaction and delete unconfirmed transactions from database
+               txIDs := w.RescanWalletTxPool()
+               w.DeleteUnconfirmedTxs(txIDs)
        }
 }
 
+//RescanBlocks provide a trigger to rescan blocks
 func (w *Wallet) RescanBlocks() {
        select {
        case w.rescanCh <- struct{}{}:
@@ -177,6 +189,8 @@ func (w *Wallet) getRescanNotification() {
                block, _ := w.chain.GetBlockByHeight(0)
                w.status.WorkHash = bc.Hash{}
                w.AttachBlock(block)
+       case newTx := <-w.txCh:
+               w.SaveUnconfirmedTx(newTx)
        default:
                return
        }
@@ -190,3 +204,8 @@ func (w *Wallet) createProgram(account *account.Account, XPub *pseudohsm.XPub, i
        }
        return nil
 }
+
+//SetTxCh set wallet txCh
+func (w *Wallet) SetTxCh(txCh *types.Tx) {
+       w.txCh <- txCh
+}