From 1398e88c1a40335c621597e96a2b400fddf562b0 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Mon, 27 Aug 2018 17:24:10 +0800 Subject: [PATCH] Del txpool --- api/api.go | 23 +--- api/query.go | 92 +++++++------- netsync/handle.go | 63 +++------- netsync/tx_keeper.go | 14 +-- node/node.go | 79 +++--------- protocol/protocol.go | 8 +- protocol/tx.go | 27 ---- protocol/txpool.go | 336 -------------------------------------------------- wallet/unconfirmed.go | 21 ++-- wallet/wallet.go | 9 ++ 10 files changed, 106 insertions(+), 566 deletions(-) delete mode 100644 protocol/txpool.go diff --git a/api/api.go b/api/api.go index bd854262..d8231866 100644 --- a/api/api.go +++ b/api/api.go @@ -17,8 +17,6 @@ import ( "github.com/bytom/dashboard" "github.com/bytom/equity" "github.com/bytom/errors" - "github.com/bytom/mining/cpuminer" - "github.com/bytom/mining/miningpool" "github.com/bytom/net/http/authn" "github.com/bytom/net/http/gzip" "github.com/bytom/net/http/httpjson" @@ -111,8 +109,6 @@ type API struct { server *http.Server handler http.Handler txFeedTracker *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner - miningPool *miningpool.MiningPool } func (a *API) initServer(config *cfg.Config) { @@ -169,15 +165,13 @@ func (a *API) StartServer(address string) { } // NewAPI create and initialize the API -func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API { +func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API { api := &API{ sync: sync, wallet: wallet, chain: chain, accessTokens: token, txFeedTracker: txfeeds, - cpuMiner: cpuMiner, - miningPool: miningPool, } api.buildHandler() api.initServer(config) @@ -208,9 +202,6 @@ func (a *API) buildHandler() { m.Handle("/get-mining-address", jsonHandler(a.getMiningAddress)) m.Handle("/set-mining-address", jsonHandler(a.setMiningAddress)) - m.Handle("/get-coinbase-arbitrary", jsonHandler(a.getCoinbaseArbitrary)) - m.Handle("/set-coinbase-arbitrary", jsonHandler(a.setCoinbaseArbitrary)) - m.Handle("/create-asset", jsonHandler(a.createAsset)) m.Handle("/update-asset-alias", jsonHandler(a.updateAssetAlias)) m.Handle("/get-asset", jsonHandler(a.getAsset)) @@ -257,8 +248,8 @@ func (a *API) buildHandler() { m.Handle("/submit-transaction", jsonHandler(a.submit)) m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas)) - m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx)) - m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs)) + //m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx)) + //m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs)) m.Handle("/decode-raw-transaction", jsonHandler(a.decodeRawTransaction)) m.Handle("/get-block", jsonHandler(a.getBlock)) @@ -268,14 +259,6 @@ func (a *API) buildHandler() { m.Handle("/get-difficulty", jsonHandler(a.getDifficulty)) m.Handle("/get-hash-rate", jsonHandler(a.getHashRate)) - m.Handle("/is-mining", jsonHandler(a.isMining)) - m.Handle("/set-mining", jsonHandler(a.setMining)) - - m.Handle("/get-work", jsonHandler(a.getWork)) - m.Handle("/get-work-json", jsonHandler(a.getWorkJSON)) - m.Handle("/submit-work", jsonHandler(a.submitWork)) - m.Handle("/submit-work-json", jsonHandler(a.submitWorkJSON)) - m.Handle("/verify-message", jsonHandler(a.verifyMessage)) m.Handle("/decode-program", jsonHandler(a.decodeProgram)) m.Handle("/compile", jsonHandler(a.compileEquity)) diff --git a/api/query.go b/api/query.go index 96a708b1..86c66fda 100644 --- a/api/query.go +++ b/api/query.go @@ -147,38 +147,38 @@ func (a *API) listTransactions(ctx context.Context, filter struct { } // POST /get-unconfirmed-transaction -func (a *API) getUnconfirmedTx(ctx context.Context, filter struct { - TxID chainjson.HexBytes `json:"tx_id"` -}) Response { - var tmpTxID [32]byte - copy(tmpTxID[:], filter.TxID[:]) - - txHash := bc.NewHash(tmpTxID) - txPool := a.chain.GetTxPool() - txDesc, err := txPool.GetTransaction(&txHash) - if err != nil { - return NewErrorResponse(err) - } - - tx := &BlockTx{ - ID: txDesc.Tx.ID, - Version: txDesc.Tx.Version, - Size: txDesc.Tx.SerializedSize, - TimeRange: txDesc.Tx.TimeRange, - Inputs: []*query.AnnotatedInput{}, - Outputs: []*query.AnnotatedOutput{}, - StatusFail: false, - } - - for i := range txDesc.Tx.Inputs { - tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i))) - } - for i := range txDesc.Tx.Outputs { - tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i)) - } - - return NewSuccessResponse(tx) -} +//func (a *API) getUnconfirmedTx(ctx context.Context, filter struct { +// TxID chainjson.HexBytes `json:"tx_id"` +//}) Response { +// var tmpTxID [32]byte +// copy(tmpTxID[:], filter.TxID[:]) +// +// txHash := bc.NewHash(tmpTxID) +// txPool := a.chain.GetTxPool() +// txDesc, err := txPool.GetTransaction(&txHash) +// if err != nil { +// return NewErrorResponse(err) +// } +// +// tx := &BlockTx{ +// ID: txDesc.Tx.ID, +// Version: txDesc.Tx.Version, +// Size: txDesc.Tx.SerializedSize, +// TimeRange: txDesc.Tx.TimeRange, +// Inputs: []*query.AnnotatedInput{}, +// Outputs: []*query.AnnotatedOutput{}, +// StatusFail: false, +// } +// +// for i := range txDesc.Tx.Inputs { +// tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i))) +// } +// for i := range txDesc.Tx.Outputs { +// tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i)) +// } +// +// return NewSuccessResponse(tx) +//} type unconfirmedTxsResp struct { Total uint64 `json:"total"` @@ -186,20 +186,20 @@ type unconfirmedTxsResp struct { } // POST /list-unconfirmed-transactions -func (a *API) listUnconfirmedTxs(ctx context.Context) Response { - txIDs := []bc.Hash{} - - txPool := a.chain.GetTxPool() - txs := txPool.GetTransactions() - for _, txDesc := range txs { - txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID)) - } - - return NewSuccessResponse(&unconfirmedTxsResp{ - Total: uint64(len(txIDs)), - TxIDs: txIDs, - }) -} +//func (a *API) listUnconfirmedTxs(ctx context.Context) Response { +// txIDs := []bc.Hash{} +// +// txPool := a.chain.GetTxPool() +// txs := txPool.GetTransactions() +// for _, txDesc := range txs { +// txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID)) +// } +// +// return NewSuccessResponse(&unconfirmedTxsResp{ +// Total: uint64(len(txIDs)), +// TxIDs: txIDs, +// }) +//} // RawTx is the tx struct for getRawTransaction type RawTx struct { diff --git a/netsync/handle.go b/netsync/handle.go index 85f0a168..41a3bc7d 100644 --- a/netsync/handle.go +++ b/netsync/handle.go @@ -18,7 +18,6 @@ import ( "github.com/bytom/consensus" "github.com/bytom/p2p" "github.com/bytom/p2p/discover" - core "github.com/bytom/protocol" "github.com/bytom/protocol/bc" "github.com/bytom/protocol/bc/types" "github.com/bytom/version" @@ -41,7 +40,6 @@ type Chain interface { GetHeaderByHeight(uint64) (*types.BlockHeader, error) InMainChain(bc.Hash) bool ProcessBlock(*types.Block) (bool, error) - ValidateTx(*types.Tx) (bool, error) } //SyncManager Sync Manager is responsible for the business layer information synchronization @@ -49,13 +47,13 @@ type SyncManager struct { sw *p2p.Switch genesisHash bc.Hash - privKey crypto.PrivKeyEd25519 // local node's p2p key - chain Chain - txPool *core.TxPool - blockKeeper *blockKeeper - peers *peerSet + privKey crypto.PrivKeyEd25519 // local node's p2p key + chain Chain + blockKeeper *blockKeeper + peers *peerSet newTxCh chan *types.Tx + txNotifyCh chan *types.Tx newBlockCh chan *bc.Hash newAddrCh chan *account.CtrlProgram spvAddresses []*account.CtrlProgram @@ -66,7 +64,7 @@ type SyncManager struct { } //NewSyncManager create a sync manager -func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) { +func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) { genesisHeader, err := chain.GetHeaderByHeight(0) if err != nil { return nil, err @@ -75,19 +73,19 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo sw := p2p.NewSwitch(config) peers := newPeerSet(sw) manager := &SyncManager{ - sw: sw, - genesisHash: genesisHeader.Hash(), - txPool: txPool, - chain: chain, - privKey: crypto.GenPrivKeyEd25519(), - blockKeeper: newBlockKeeper(chain, peers), - peers: peers, - newTxCh: make(chan *types.Tx, maxTxChanSize), - newBlockCh: newBlockCh, - txSyncCh: make(chan *txSyncMsg), - quitSync: make(chan struct{}), - config: config, - newAddrCh: wallet.AccountMgr.NewAddrCh, + sw: sw, + genesisHash: genesisHeader.Hash(), + chain: chain, + privKey: crypto.GenPrivKeyEd25519(), + blockKeeper: newBlockKeeper(chain, peers), + peers: peers, + newTxCh: make(chan *types.Tx, maxTxChanSize), + txNotifyCh: wallet.GetTxCh(), + newBlockCh: newBlockCh, + txSyncCh: make(chan *txSyncMsg), + quitSync: make(chan struct{}), + config: config, + newAddrCh: wallet.AccountMgr.NewAddrCh, } manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram() protocolReactor := NewProtocolReactor(manager, manager.peers) @@ -295,10 +293,7 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message") return } - - if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false { - sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") - } + sm.txNotifyCh <- tx } func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) { @@ -308,12 +303,6 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai } switch msg := msg.(type) { - //case *GetBlockMessage: - // sm.handleGetBlockMsg(peer, msg) - // - //case *BlockMessage: - // sm.handleBlockMsg(peer, msg) - case *StatusRequestMessage: sm.handleStatusRequestMsg(basePeer) @@ -323,21 +312,9 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai case *TransactionMessage: sm.handleTransactionMsg(peer, msg) - //case *MineBlockMessage: - // sm.handleMineBlockMsg(peer, msg) - - //case *GetHeadersMessage: - // sm.handleGetHeadersMsg(peer, msg) - case *HeadersMessage: sm.handleHeadersMsg(peer, msg) - //case *GetBlocksMessage: - // sm.handleGetBlocksMsg(peer, msg) - - //case *BlocksMessage: - // sm.handleBlocksMsg(peer, msg) - case *MerkleBlockMessage: sm.handleMerkelBlockMsg(peer, msg) diff --git a/netsync/tx_keeper.go b/netsync/tx_keeper.go index eefe2b84..9246bac6 100644 --- a/netsync/tx_keeper.go +++ b/netsync/tx_keeper.go @@ -19,23 +19,11 @@ type txSyncMsg struct { txs []*types.Tx } -func (sm *SyncManager) syncTransactions(peerID string) { - pending := sm.txPool.GetTransactions() - if len(pending) == 0 { - return - } - - txs := make([]*types.Tx, len(pending)) - for i, batch := range pending { - txs[i] = batch.Tx - } - sm.txSyncCh <- &txSyncMsg{peerID, txs} -} - func (sm *SyncManager) txBroadcastLoop() { for { select { case newTx := <-sm.newTxCh: + sm.txNotifyCh <- newTx if err := sm.peers.broadcastTx(newTx); err != nil { log.Errorf("Broadcast new tx error. %v", err) return diff --git a/node/node.go b/node/node.go index ce972d2d..a4545b0d 100644 --- a/node/node.go +++ b/node/node.go @@ -25,8 +25,6 @@ import ( "github.com/bytom/consensus" "github.com/bytom/database/leveldb" "github.com/bytom/env" - "github.com/bytom/mining/cpuminer" - "github.com/bytom/mining/miningpool" "github.com/bytom/mining/tensority" "github.com/bytom/netsync" "github.com/bytom/protocol" @@ -47,14 +45,11 @@ type Node struct { syncManager *netsync.SyncManager - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore api *api.API chain *protocol.Chain txfeed *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner - miningPool *miningpool.MiningPool miningEnable bool } @@ -72,8 +67,7 @@ func NewNode(config *cfg.Config) *Node { tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool) + chain, err := protocol.NewChain(store) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } @@ -96,26 +90,22 @@ func NewNode(config *cfg.Config) *Node { cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err)) } - if !config.Wallet.Disable { - walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) - accounts = account.NewManager(walletDB, chain) - assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain) - if err != nil { - log.WithField("error", err).Error("init NewWallet") - } + walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) + accounts = account.NewManager(walletDB, chain) + assets = asset.NewRegistry(walletDB, chain) + wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain) + if err != nil { + log.WithField("error", err).Error("init NewWallet") + } - // trigger rescan wallet - if config.Wallet.Rescan { - wallet.RescanBlocks() - } + // trigger rescan wallet + if config.Wallet.Rescan { + wallet.RescanBlocks() } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh, wallet) + newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet) + syncManager, _ := netsync.NewSyncManager(config, chain, newBlockCh, wallet) // run the profile server profileHost := config.ProfListenAddress @@ -137,9 +127,6 @@ func NewNode(config *cfg.Config) *Node { miningEnable: config.Mining, } - node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh) - node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh) - node.BaseService = *cmn.NewBaseService(nil, "Node", node) if config.Simd.Enable { @@ -149,29 +136,6 @@ func NewNode(config *cfg.Config) *Node { return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") - } - } -} - // Lock data directory after daemonization func lockDataDirectory(config *cfg.Config) error { _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) @@ -214,7 +178,7 @@ func launchWebBrowser(port string) { } func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens) + n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.chain, n.config, n.accessTokens) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() @@ -222,14 +186,6 @@ func (n *Node) initAndstartApiServer() { } func (n *Node) OnStart() error { - if n.miningEnable { - if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil { - n.miningEnable = false - log.Error(err) - } else { - n.cpuMiner.Start() - } - } if !n.config.VaultMode { n.syncManager.Start() } @@ -246,9 +202,6 @@ func (n *Node) OnStart() error { func (n *Node) OnStop() { n.BaseService.OnStop() - if n.miningEnable { - n.cpuMiner.Stop() - } if !n.config.VaultMode { n.syncManager.Stop() } @@ -264,7 +217,3 @@ func (n *Node) RunForever() { func (n *Node) SyncManager() *netsync.SyncManager { return n.syncManager } - -func (n *Node) MiningPool() *miningpool.MiningPool { - return n.miningPool -} diff --git a/protocol/protocol.go b/protocol/protocol.go index 015cb94f..d2cf7b5d 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -22,7 +22,6 @@ var ErrTheDistantFuture = errors.New("block height too far in future") type Chain struct { index *state.BlockIndex orphanManage *OrphanManage - txPool *TxPool store Store processBlockCh chan *processBlockMsg @@ -31,10 +30,9 @@ type Chain struct { } // NewChain returns a new Chain using store as the underlying storage. -func NewChain(store Store, txPool *TxPool) (*Chain, error) { +func NewChain(store Store) (*Chain, error) { c := &Chain{ orphanManage: NewOrphanManage(), - txPool: txPool, store: store, processBlockCh: make(chan *processBlockMsg, maxProcessBlockChSize), } @@ -158,7 +156,3 @@ func (c *Chain) BlockWaiter(height uint64) <-chan struct{} { return ch } -// GetTxPool return chain txpool. -func (c *Chain) GetTxPool() *TxPool { - return c.txPool -} diff --git a/protocol/tx.go b/protocol/tx.go index 8198e181..ef07276c 100644 --- a/protocol/tx.go +++ b/protocol/tx.go @@ -3,9 +3,6 @@ package protocol import ( "github.com/bytom/errors" "github.com/bytom/protocol/bc" - "github.com/bytom/protocol/bc/types" - "github.com/bytom/protocol/state" - "github.com/bytom/protocol/validation" ) // ErrBadTx is returned for transactions failing validation @@ -15,27 +12,3 @@ var ErrBadTx = errors.New("invalid transaction") func (c *Chain) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) { return c.store.GetTransactionStatus(hash) } - -// GetTransactionsUtxo return all the utxos that related to the txs' inputs -func (c *Chain) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error { - return c.store.GetTransactionsUtxo(view, txs) -} - -// ValidateTx validates the given transaction. A cache holds -// per-transaction validation results and is consulted before -// performing full validation. -func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) { - if ok := c.txPool.HaveTransaction(&tx.ID); ok { - return false, c.txPool.GetErrCache(&tx.ID) - } - - bh := c.BestBlockHeader() - block := types.MapBlock(&types.Block{BlockHeader: *bh}) - gasStatus, err := validation.ValidateTx(tx.Tx, block) - if gasStatus.GasValid == false { - c.txPool.AddErrCache(&tx.ID, err) - return false, err - } - - return c.txPool.ProcessTransaction(tx, err != nil, block.BlockHeader.Height, gasStatus.BTMValue) -} diff --git a/protocol/txpool.go b/protocol/txpool.go deleted file mode 100644 index 6924e5ba..00000000 --- a/protocol/txpool.go +++ /dev/null @@ -1,336 +0,0 @@ -package protocol - -import ( - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/golang/groupcache/lru" - log "github.com/sirupsen/logrus" - - "github.com/bytom/consensus" - "github.com/bytom/protocol/bc" - "github.com/bytom/protocol/bc/types" - "github.com/bytom/protocol/state" -) - -// msg type -const ( - MsgNewTx = iota - MsgRemoveTx -) - -var ( - maxCachedErrTxs = 1000 - maxMsgChSize = 1000 - maxNewTxNum = 10000 - maxOrphanNum = 2000 - - orphanTTL = 10 * time.Minute - orphanExpireScanInterval = 3 * time.Minute - - // ErrTransactionNotExist is the pre-defined error message - ErrTransactionNotExist = errors.New("transaction are not existed in the mempool") - // ErrPoolIsFull indicates the pool is full - ErrPoolIsFull = errors.New("transaction pool reach the max number") -) - -// TxDesc store tx and related info for mining strategy -type TxDesc struct { - Tx *types.Tx - Added time.Time - StatusFail bool - Height uint64 - Weight uint64 - Fee uint64 -} - -// TxPoolMsg is use for notify pool changes -type TxPoolMsg struct { - *TxDesc - MsgType int -} - -type orphanTx struct { - *TxDesc - expiration time.Time -} - -// TxPool is use for store the unconfirmed transaction -type TxPool struct { - lastUpdated int64 - mtx sync.RWMutex - store Store - pool map[bc.Hash]*TxDesc - utxo map[bc.Hash]*types.Tx - orphans map[bc.Hash]*orphanTx - orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx - errCache *lru.Cache - msgCh chan *TxPoolMsg -} - -// NewTxPool init a new TxPool -func NewTxPool(store Store) *TxPool { - tp := &TxPool{ - lastUpdated: time.Now().Unix(), - store: store, - pool: make(map[bc.Hash]*TxDesc), - utxo: make(map[bc.Hash]*types.Tx), - orphans: make(map[bc.Hash]*orphanTx), - orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx), - errCache: lru.New(maxCachedErrTxs), - msgCh: make(chan *TxPoolMsg, maxMsgChSize), - } - go tp.orphanExpireWorker() - return tp -} - -// AddErrCache add a failed transaction record to lru cache -func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) { - tp.mtx.Lock() - defer tp.mtx.Unlock() - - tp.errCache.Add(txHash, err) -} - -// ExpireOrphan expire all the orphans that before the input time range -func (tp *TxPool) ExpireOrphan(now time.Time) { - tp.mtx.Lock() - defer tp.mtx.Unlock() - - for hash, orphan := range tp.orphans { - if orphan.expiration.Before(now) { - tp.removeOrphan(&hash) - } - } -} - -// GetErrCache return the error of the transaction -func (tp *TxPool) GetErrCache(txHash *bc.Hash) error { - tp.mtx.Lock() - defer tp.mtx.Unlock() - - v, ok := tp.errCache.Get(txHash) - if !ok { - return nil - } - return v.(error) -} - -// GetMsgCh return a unconfirmed transaction feed channel -func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg { - return tp.msgCh -} - -// RemoveTransaction remove a transaction from the pool -func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { - tp.mtx.Lock() - defer tp.mtx.Unlock() - - txD, ok := tp.pool[*txHash] - if !ok { - return - } - - for _, output := range txD.Tx.ResultIds { - delete(tp.utxo, *output) - } - delete(tp.pool, *txHash) - - atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx} - log.WithField("tx_id", txHash).Debug("remove tx from mempool") -} - -// GetTransaction return the TxDesc by hash -func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) { - tp.mtx.RLock() - defer tp.mtx.RUnlock() - - if txD, ok := tp.pool[*txHash]; ok { - return txD, nil - } - return nil, ErrTransactionNotExist -} - -// GetTransactions return all the transactions in the pool -func (tp *TxPool) GetTransactions() []*TxDesc { - tp.mtx.RLock() - defer tp.mtx.RUnlock() - - txDs := make([]*TxDesc, len(tp.pool)) - i := 0 - for _, desc := range tp.pool { - txDs[i] = desc - i++ - } - return txDs -} - -// IsTransactionInPool check wheather a transaction in pool or not -func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool { - tp.mtx.RLock() - defer tp.mtx.RUnlock() - - _, ok := tp.pool[*txHash] - return ok -} - -// IsTransactionInErrCache check wheather a transaction in errCache or not -func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool { - tp.mtx.RLock() - defer tp.mtx.RUnlock() - - _, ok := tp.errCache.Get(txHash) - return ok -} - -// HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool -func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool { - return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash) -} - -// ProcessTransaction is the main entry for txpool handle new tx -func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { - tp.mtx.Lock() - defer tp.mtx.Unlock() - - txD := &TxDesc{ - Tx: tx, - StatusFail: statusFail, - Weight: tx.SerializedSize, - Height: height, - Fee: fee, - } - requireParents, err := tp.checkOrphanUtxos(tx) - if err != nil { - return false, err - } - - if len(requireParents) > 0 { - return true, tp.addOrphan(txD, requireParents) - } - - if err := tp.addTransaction(txD); err != nil { - return false, err - } - - tp.processOrphans(txD) - return false, nil -} - -func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error { - if len(tp.orphans) >= maxOrphanNum { - return ErrPoolIsFull - } - - orphan := &orphanTx{txD, time.Now().Add(orphanTTL)} - tp.orphans[txD.Tx.ID] = orphan - for _, hash := range requireParents { - if _, ok := tp.orphansByPrev[*hash]; !ok { - tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx) - } - tp.orphansByPrev[*hash][txD.Tx.ID] = orphan - } - return nil -} - -func (tp *TxPool) addTransaction(txD *TxDesc) error { - if len(tp.pool) >= maxNewTxNum { - return ErrPoolIsFull - } - - tx := txD.Tx - txD.Added = time.Now() - tp.pool[tx.ID] = txD - for _, id := range tx.ResultIds { - output, err := tx.Output(*id) - if err != nil { - // error due to it's a retirement, utxo doesn't care this output type so skip it - continue - } - if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID { - tp.utxo[*id] = tx - } - } - - atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx} - log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool") - return nil -} - -func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) { - view := state.NewUtxoViewpoint() - if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil { - return nil, err - } - - hashes := []*bc.Hash{} - for _, hash := range tx.SpentOutputIDs { - if !view.CanSpend(&hash) && tp.utxo[hash] == nil { - hashes = append(hashes, &hash) - } - } - return hashes, nil -} - -func (tp *TxPool) orphanExpireWorker() { - ticker := time.NewTicker(orphanExpireScanInterval) - for now := range ticker.C { - tp.ExpireOrphan(now) - } -} - -func (tp *TxPool) processOrphans(txD *TxDesc) { - processOrphans := []*orphanTx{} - addRely := func(tx *types.Tx) { - for _, outHash := range tx.ResultIds { - orphans, ok := tp.orphansByPrev[*outHash] - if !ok { - continue - } - - for _, orphan := range orphans { - processOrphans = append(processOrphans, orphan) - } - delete(tp.orphansByPrev, *outHash) - } - } - - addRely(txD.Tx) - for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] { - processOrphan := processOrphans[0] - requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx) - if err != nil { - log.WithField("err", err).Error("processOrphans got unexpect error") - continue - } - - if len(requireParents) == 0 { - addRely(processOrphan.Tx) - tp.removeOrphan(&processOrphan.Tx.ID) - tp.addTransaction(processOrphan.TxDesc) - } - } -} - -func (tp *TxPool) removeOrphan(hash *bc.Hash) { - orphan, ok := tp.orphans[*hash] - if !ok { - return - } - - for _, spend := range orphan.Tx.SpentOutputIDs { - orphans, ok := tp.orphansByPrev[spend] - if !ok { - continue - } - - if delete(orphans, *hash); len(orphans) == 0 { - delete(tp.orphansByPrev, spend) - } - } - delete(tp.orphans, *hash) -} diff --git a/wallet/unconfirmed.go b/wallet/unconfirmed.go index 771d7dd6..68fe5cb5 100644 --- a/wallet/unconfirmed.go +++ b/wallet/unconfirmed.go @@ -11,7 +11,6 @@ import ( "github.com/bytom/account" "github.com/bytom/blockchain/query" "github.com/bytom/crypto/sha3pool" - "github.com/bytom/protocol" "github.com/bytom/protocol/bc/types" ) @@ -34,14 +33,10 @@ func (a SortByTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a SortByTimestamp) Less(i, j int) bool { return a[i].Timestamp > a[j].Timestamp } // AddUnconfirmedTx handle wallet status update when tx add into txpool -func (w *Wallet) AddUnconfirmedTx(txD *protocol.TxDesc) { - if err := w.saveUnconfirmedTx(txD.Tx); err != nil { +func (w *Wallet) AddUnconfirmedTx(tx *types.Tx) { + if err := w.saveUnconfirmedTx(tx); err != nil { log.WithField("err", err).Error("wallet fail on saveUnconfirmedTx") } - - utxos := txOutToUtxos(txD.Tx, txD.StatusFail, 0) - utxos = w.filterAccountUtxo(utxos) - w.AccountMgr.AddUnconfirmedUtxo(utxos) } // GetUnconfirmedTxs get account unconfirmed transactions, filter transactions by accountID when accountID is not empty @@ -83,8 +78,8 @@ func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) } // RemoveUnconfirmedTx handle wallet status update when tx removed from txpool -func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) { - w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds) +func (w *Wallet) RemoveUnconfirmedTx(tx *types.Tx) { + w.AccountMgr.RemoveUnconfirmedUtxo(tx.ResultIds) } func (w *Wallet) buildAnnotatedUnconfirmedTx(tx *types.Tx) *query.AnnotatedTx { @@ -176,3 +171,11 @@ func (w *Wallet) delUnconfirmedTx() { } } } + +// newTxListener listener transaction from txPool, and send it to syncManager and wallet +func (w *Wallet) newTxListener() { + for { + tx := <-w.newTxCh + w.AddUnconfirmedTx(tx) + } +} diff --git a/wallet/wallet.go b/wallet/wallet.go index c00651a3..d64a075f 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -18,6 +18,8 @@ import ( const ( //SINGLE single sign SINGLE = 1 + //channel size for notifying tx msg + NewTxChSize = 1024 ) var walletKey = []byte("walletInfo") @@ -40,6 +42,7 @@ type Wallet struct { Hsm *pseudohsm.HSM chain *protocol.Chain rescanCh chan struct{} + newTxCh chan *types.Tx } //NewWallet return a new wallet instance @@ -51,6 +54,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, chain: chain, Hsm: hsm, rescanCh: make(chan struct{}, 1), + newTxCh: make(chan *types.Tx, NewTxChSize), } if err := w.loadWalletInfo(); err != nil { @@ -58,6 +62,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, } go w.walletUpdater() + go w.newTxListener() go w.delUnconfirmedTx() return w, nil } @@ -212,3 +217,7 @@ func (w *Wallet) GetWalletStatusInfo() StatusInfo { return w.status } + +func (w *Wallet) GetTxCh() chan *types.Tx { + return w.newTxCh +} -- 2.11.0