newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
+
+ // get transaction from txPool and send it to syncManager and wallet
go func() {
newTxCh := txPool.GetNewTxCh()
for {
import (
"encoding/json"
+ "strings"
log "github.com/sirupsen/logrus"
- "fmt"
"github.com/bytom/blockchain/query"
"github.com/bytom/protocol/bc/types"
)
}
w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
+ log.Infof("insert unconfirmed tx=%s into db", tx.ID.String())
return nil
}
-// DeleteUnconfirmedTxs delete unconfirmed annotated transaction from the database
-func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) {
- for _, txID := range txIDs {
- if exist := w.DB.Get(calcUnconfirmedKey(txID)); exist != nil {
- w.DB.Delete(calcUnconfirmedKey(txID))
+// DeleteUnconfirmedTxs delete unconfirmed annotated transactions from the database when these transactions are not existed in txpool
+func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) error {
+ var TxIDsStr string
+ for i, txID := range txIDs {
+ if i == 0 {
+ TxIDsStr += txID
}
+ TxIDsStr = TxIDsStr + ":" + txID
}
+
+ txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
+ defer txIter.Release()
+ for txIter.Next() {
+ annotatedTx := &query.AnnotatedTx{}
+ if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
+ return err
+ }
+
+ if !strings.Contains(TxIDsStr, annotatedTx.ID.String()) {
+ w.DB.Delete(calcUnconfirmedKey(annotatedTx.ID.String()))
+ log.Infof("delete unconfirmed tx=%s from db", annotatedTx.ID.String())
+ }
+ }
+
+ return nil
}
// RescanWalletTxPool rescan txPool
// GetUnconfirmedTxByTxID get unconfirmed transaction by txID
func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
- formatKey := w.DB.Get(calcUnconfirmedKey(txID))
- if formatKey == nil {
- return nil, fmt.Errorf("Not found unconfirmed transaction(tx_id=%s) ", txID)
- }
-
annotatedTx := &query.AnnotatedTx{}
- txInfo := w.DB.Get(calcAnnotatedKey(string(formatKey)))
+ txInfo := w.DB.Get(calcUnconfirmedKey(txID))
if err := json.Unmarshal(txInfo, annotatedTx); err != nil {
return nil, err
}
}
go w.walletUpdater()
+ go w.walletTxPoolUpdater()
return w, nil
}
}
if err := w.DetachBlock(block); err != nil {
- log.WithField("err", err).Error("walletUpdater detachBlock")
+ log.WithField("err", err).Error("walletUpdater detachBlock stop")
return
}
}
}
if err := w.AttachBlock(block); err != nil {
- log.WithField("err", err).Error("walletUpdater stop")
+ log.WithField("err", err).Error("walletUpdater AttachBlock stop")
return
}
-
- // rescan txpool transaction and delete unconfirmed transactions from database
- txIDs := w.RescanWalletTxPool()
- w.DeleteUnconfirmedTxs(txIDs)
}
}
block, _ := w.chain.GetBlockByHeight(0)
w.status.WorkHash = bc.Hash{}
w.AttachBlock(block)
- case newTx := <-w.txCh:
- w.SaveUnconfirmedTx(newTx)
default:
- return
+ //return
+ }
+}
+
+func (w *Wallet) walletTxPoolUpdater() {
+ for {
+ // rescan txpool transaction and delete unconfirmed transactions from database
+ txIDs := w.RescanWalletTxPool()
+ if err := w.DeleteUnconfirmedTxs(txIDs); err != nil {
+ log.WithField("err", err).Error("DeleteUnconfirmedTxs unmarshal error")
+ return
+ }
+
+ select {
+ case newTx := <-w.txCh:
+ w.SaveUnconfirmedTx(newTx)
+ default:
+ }
}
}