OSDN Git Service

add goroutine for wallet txpool updater
authoroysheng <oysheng@bytom.io>
Fri, 1 Jun 2018 08:55:38 +0000 (16:55 +0800)
committeroysheng <oysheng@bytom.io>
Fri, 1 Jun 2018 08:55:38 +0000 (16:55 +0800)
modify delete unconfirmed transactions function

node/node.go
wallet/unconfirmed.go
wallet/wallet.go

index f3a46f5..591c658 100644 (file)
@@ -124,6 +124,8 @@ func NewNode(config *cfg.Config) *Node {
        newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
        syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
+
+       // get transaction from txPool and send it to syncManager and wallet
        go func() {
                newTxCh := txPool.GetNewTxCh()
                for {
index 26b81cf..cc68422 100644 (file)
@@ -2,10 +2,10 @@ package wallet
 
 import (
        "encoding/json"
+       "strings"
 
        log "github.com/sirupsen/logrus"
 
-       "fmt"
        "github.com/bytom/blockchain/query"
        "github.com/bytom/protocol/bc/types"
 )
@@ -48,16 +48,35 @@ func (w *Wallet) SaveUnconfirmedTx(tx *types.Tx) error {
        }
 
        w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
+       log.Infof("insert unconfirmed tx=%s into db", tx.ID.String())
        return nil
 }
 
-// DeleteUnconfirmedTxs delete unconfirmed annotated transaction from the database
-func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) {
-       for _, txID := range txIDs {
-               if exist := w.DB.Get(calcUnconfirmedKey(txID)); exist != nil {
-                       w.DB.Delete(calcUnconfirmedKey(txID))
+// DeleteUnconfirmedTxs delete unconfirmed annotated transactions from the database when these transactions are not existed in txpool
+func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) error {
+       var TxIDsStr string
+       for i, txID := range txIDs {
+               if i == 0 {
+                       TxIDsStr += txID
                }
+               TxIDsStr = TxIDsStr + ":" + txID
        }
+
+       txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
+       defer txIter.Release()
+       for txIter.Next() {
+               annotatedTx := &query.AnnotatedTx{}
+               if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
+                       return err
+               }
+
+               if !strings.Contains(TxIDsStr, annotatedTx.ID.String()) {
+                       w.DB.Delete(calcUnconfirmedKey(annotatedTx.ID.String()))
+                       log.Infof("delete unconfirmed tx=%s from db", annotatedTx.ID.String())
+               }
+       }
+
+       return nil
 }
 
 // RescanWalletTxPool rescan txPool
@@ -75,13 +94,8 @@ func (w *Wallet) RescanWalletTxPool() []string {
 
 // GetUnconfirmedTxByTxID get unconfirmed transaction by txID
 func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
-       formatKey := w.DB.Get(calcUnconfirmedKey(txID))
-       if formatKey == nil {
-               return nil, fmt.Errorf("Not found unconfirmed transaction(tx_id=%s) ", txID)
-       }
-
        annotatedTx := &query.AnnotatedTx{}
-       txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey)))
+       txInfo := w.DB.Get(calcUnconfirmedKey(txID))
        if err := json.Unmarshal(txInfo, annotatedTx); err != nil {
                return nil, err
        }
index ce2ab80..e3f87af 100644 (file)
@@ -61,6 +61,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
        }
 
        go w.walletUpdater()
+       go w.walletTxPoolUpdater()
 
        return w, nil
 }
@@ -152,7 +153,7 @@ func (w *Wallet) walletUpdater() {
                        }
 
                        if err := w.DetachBlock(block); err != nil {
-                               log.WithField("err", err).Error("walletUpdater detachBlock")
+                               log.WithField("err", err).Error("walletUpdater detachBlock stop")
                                return
                        }
                }
@@ -164,13 +165,9 @@ func (w *Wallet) walletUpdater() {
                }
 
                if err := w.AttachBlock(block); err != nil {
-                       log.WithField("err", err).Error("walletUpdater stop")
+                       log.WithField("err", err).Error("walletUpdater AttachBlock stop")
                        return
                }
-
-               // rescan txpool transaction and delete unconfirmed transactions from database
-               txIDs := w.RescanWalletTxPool()
-               w.DeleteUnconfirmedTxs(txIDs)
        }
 }
 
@@ -189,10 +186,25 @@ func (w *Wallet) getRescanNotification() {
                block, _ := w.chain.GetBlockByHeight(0)
                w.status.WorkHash = bc.Hash{}
                w.AttachBlock(block)
-       case newTx := <-w.txCh:
-               w.SaveUnconfirmedTx(newTx)
        default:
-               return
+               //return
+       }
+}
+
+func (w *Wallet) walletTxPoolUpdater() {
+       for {
+               // rescan txpool transaction and delete unconfirmed transactions from database
+               txIDs := w.RescanWalletTxPool()
+               if err := w.DeleteUnconfirmedTxs(txIDs); err != nil {
+                       log.WithField("err", err).Error("DeleteUnconfirmedTxs unmarshal error")
+                       return
+               }
+
+               select {
+               case newTx := <-w.txCh:
+                       w.SaveUnconfirmedTx(newTx)
+               default:
+               }
        }
 }