From 2d27cad41ef79da3a46142401b5efcf92133ea1a Mon Sep 17 00:00:00 2001 From: muscle_boy Date: Tue, 28 May 2019 13:52:35 +0800 Subject: [PATCH] modify miner block to propose block (#92) * modify miner block to propose block * modify miner block to propose block --- api/api.go | 8 +- api/miner.go | 6 +- api/nodeinfo.go | 6 +- event/event.go | 2 +- mining/cpuminer/cpuminer.go | 269 --------------------- node/node.go | 6 +- proposal/blockproposer/blockproposer.go | 258 ++++++++++++++++++++ mining/mining.go => proposal/proposal.go | 4 +- mining/mining_test.go => proposal/proposal_test.go | 2 +- {mining => proposal}/sort.go | 2 +- protocol/bbft.go | 5 +- protocol/bbft_test.go | 11 +- protocol/block.go | 2 +- protocol/consensus_node_manager.go | 45 ++-- protocol/protocol.go | 5 + test/bench_blockchain_test.go | 4 +- test/performance/mining_test.go | 4 +- 17 files changed, 316 insertions(+), 323 deletions(-) delete mode 100644 mining/cpuminer/cpuminer.go create mode 100644 proposal/blockproposer/blockproposer.go rename mining/mining.go => proposal/proposal.go (98%) rename mining/mining_test.go => proposal/proposal_test.go (98%) rename {mining => proposal}/sort.go (93%) diff --git a/api/api.go b/api/api.go index 15741513..30e71522 100644 --- a/api/api.go +++ b/api/api.go @@ -18,7 +18,6 @@ import ( "github.com/vapor/dashboard/equity" "github.com/vapor/errors" "github.com/vapor/event" - "github.com/vapor/mining/cpuminer" "github.com/vapor/net/http/authn" "github.com/vapor/net/http/gzip" "github.com/vapor/net/http/httpjson" @@ -28,6 +27,7 @@ import ( "github.com/vapor/p2p" "github.com/vapor/protocol" "github.com/vapor/wallet" + "github.com/vapor/proposal/blockproposer" ) var ( @@ -113,7 +113,7 @@ type API struct { server *http.Server handler http.Handler txFeedTracker *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner + blockProposer *blockproposer.BlockProposer notificationMgr *websocket.WSNotificationManager eventDispatcher *event.Dispatcher } @@ -180,14 +180,14 @@ type NetSync interface { } // NewAPI create and initialize the API -func NewAPI(sync NetSync, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, dispatcher *event.Dispatcher, notificationMgr *websocket.WSNotificationManager) *API { +func NewAPI(sync NetSync, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, blockProposer *blockproposer.BlockProposer, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, dispatcher *event.Dispatcher, notificationMgr *websocket.WSNotificationManager) *API { api := &API{ sync: sync, wallet: wallet, chain: chain, accessTokens: token, txFeedTracker: txfeeds, - cpuMiner: cpuMiner, + blockProposer: blockProposer, eventDispatcher: dispatcher, notificationMgr: notificationMgr, } diff --git a/api/miner.go b/api/miner.go index fcea43f3..a7029e97 100644 --- a/api/miner.go +++ b/api/miner.go @@ -54,7 +54,7 @@ func (a *API) submitBlock(ctx context.Context, req *SubmitBlockReq) Response { return NewErrorResponse(errors.New("block submitted is orphan")) } - if err = a.eventDispatcher.Post(event.NewMinedBlockEvent{Block: *req.Block}); err != nil { + if err = a.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *req.Block}); err != nil { return NewErrorResponse(err) } @@ -74,7 +74,7 @@ func (a *API) setMining(in struct { } func (a *API) startMining() Response { - a.cpuMiner.Start() + a.blockProposer.Start() if !a.IsMining() { return NewErrorResponse(errors.New("Failed to start mining")) } @@ -82,7 +82,7 @@ func (a *API) startMining() Response { } func (a *API) stopMining() Response { - a.cpuMiner.Stop() + a.blockProposer.Stop() if a.IsMining() { return NewErrorResponse(errors.New("Failed to stop mining")) } diff --git a/api/nodeinfo.go b/api/nodeinfo.go index 7ddde83e..b00c8fd4 100644 --- a/api/nodeinfo.go +++ b/api/nodeinfo.go @@ -33,7 +33,7 @@ func (a *API) GetNodeInfo() *NetInfo { info := &NetInfo{ Listening: a.sync.IsListening(), Syncing: !a.sync.IsCaughtUp(), - Mining: a.cpuMiner.IsMining(), + Mining: a.blockProposer.IsProposing(), PeerCount: a.sync.PeerCount(), CurrentBlock: a.chain.BestBlockHeight(), NetWorkID: a.sync.GetNetwork(), @@ -98,9 +98,9 @@ func (a *API) isMining() Response { return NewSuccessResponse(IsMining) } -// IsMining return mining status +// IsProposing return mining status func (a *API) IsMining() bool { - return a.cpuMiner.IsMining() + return a.blockProposer.IsProposing() } // return the peers of current node diff --git a/event/event.go b/event/event.go index 77bbb9f6..4e76fcb8 100644 --- a/event/event.go +++ b/event/event.go @@ -25,7 +25,7 @@ var ( ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type") ) -type NewMinedBlockEvent struct{ Block types.Block } +type NewProposedBlockEvent struct{ Block types.Block } type BlockSignatureEvent struct { BlockHash bc.Hash diff --git a/mining/cpuminer/cpuminer.go b/mining/cpuminer/cpuminer.go deleted file mode 100644 index 0a823790..00000000 --- a/mining/cpuminer/cpuminer.go +++ /dev/null @@ -1,269 +0,0 @@ -package cpuminer - -import ( - "sync" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/vapor/account" - "github.com/vapor/event" - "github.com/vapor/mining" - "github.com/vapor/protocol" - "github.com/vapor/protocol/bc/types" -) - -const ( - maxNonce = ^uint64(0) // 2^64 - 1 - defaultNumWorkers = 1 - hashUpdateSecs = 1 - logModule = "cpuminer" -) - -// CPUMiner provides facilities for solving blocks (mining) using the CPU in -// a concurrency-safe manner. -type CPUMiner struct { - sync.Mutex - chain *protocol.Chain - accountManager *account.Manager - txPool *protocol.TxPool - numWorkers uint64 - started bool - discreteMining bool - workerWg sync.WaitGroup - updateNumWorkers chan struct{} - quit chan struct{} - eventDispatcher *event.Dispatcher -} - -// solveBlock attempts to find some combination of a nonce, extra nonce, and -// current timestamp which makes the passed block hash to a value less than the -// target difficulty. -func (m *CPUMiner) solveBlock(block *types.Block, ticker *time.Ticker, quit chan struct{}) bool { - header := &block.BlockHeader - - for i := uint64(0); i <= maxNonce; i++ { - select { - case <-quit: - return false - case <-ticker.C: - if m.chain.BestBlockHeight() >= header.Height { - return false - } - default: - } - - //Mining logic here - } - return false -} - -// generateBlocks is a worker that is controlled by the miningWorkerController. -// It is self contained in that it creates block templates and attempts to solve -// them while detecting when it is performing stale work and reacting -// accordingly by generating a new block template. When a block is solved, it -// is submitted. -// -// It must be run as a goroutine. -func (m *CPUMiner) generateBlocks(quit chan struct{}) { - ticker := time.NewTicker(time.Second * hashUpdateSecs) - defer ticker.Stop() - -out: - for { - select { - case <-quit: - break out - default: - } - - block, err := mining.NewBlockTemplate(m.chain, m.txPool, m.accountManager) - if err != nil { - log.Errorf("Mining: failed on create NewBlockTemplate: %v", err) - continue - } - - if m.solveBlock(block, ticker, quit) { - if isOrphan, err := m.chain.ProcessBlock(block); err == nil { - log.WithFields(log.Fields{ - "module": logModule, - "height": block.BlockHeader.Height, - "isOrphan": isOrphan, - "tx": len(block.Transactions), - }).Info("Miner processed block") - - // Broadcast the block and announce chain insertion event - if err = m.eventDispatcher.Post(event.NewMinedBlockEvent{Block: *block}); err != nil { - log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Miner fail on post block") - } - } else { - log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Miner fail on ProcessBlock") - } - } - } - - m.workerWg.Done() -} - -// miningWorkerController launches the worker goroutines that are used to -// generate block templates and solve them. It also provides the ability to -// dynamically adjust the number of running worker goroutines. -// -// It must be run as a goroutine. -func (m *CPUMiner) miningWorkerController() { - // launchWorkers groups common code to launch a specified number of - // workers for generating blocks. - var runningWorkers []chan struct{} - launchWorkers := func(numWorkers uint64) { - for i := uint64(0); i < numWorkers; i++ { - quit := make(chan struct{}) - runningWorkers = append(runningWorkers, quit) - - m.workerWg.Add(1) - go m.generateBlocks(quit) - } - } - - // Launch the current number of workers by default. - runningWorkers = make([]chan struct{}, 0, m.numWorkers) - launchWorkers(m.numWorkers) - -out: - for { - select { - // Update the number of running workers. - case <-m.updateNumWorkers: - // No change. - numRunning := uint64(len(runningWorkers)) - if m.numWorkers == numRunning { - continue - } - - // Add new workers. - if m.numWorkers > numRunning { - launchWorkers(m.numWorkers - numRunning) - continue - } - - // Signal the most recently created goroutines to exit. - for i := numRunning - 1; i >= m.numWorkers; i-- { - close(runningWorkers[i]) - runningWorkers[i] = nil - runningWorkers = runningWorkers[:i] - } - - case <-m.quit: - for _, quit := range runningWorkers { - close(quit) - } - break out - } - } - - m.workerWg.Wait() -} - -// Start begins the CPU mining process as well as the speed monitor used to -// track hashing metrics. Calling this function when the CPU miner has -// already been started will have no effect. -// -// This function is safe for concurrent access. -func (m *CPUMiner) Start() { - m.Lock() - defer m.Unlock() - - // Nothing to do if the miner is already running - if m.started { - return - } - - m.quit = make(chan struct{}) - go m.miningWorkerController() - - m.started = true - log.Infof("CPU miner started") -} - -// Stop gracefully stops the mining process by signalling all workers, and the -// speed monitor to quit. Calling this function when the CPU miner has not -// already been started will have no effect. -// -// This function is safe for concurrent access. -func (m *CPUMiner) Stop() { - m.Lock() - defer m.Unlock() - - // Nothing to do if the miner is not currently running - if !m.started { - return - } - - close(m.quit) - m.started = false - log.Info("CPU miner stopped") -} - -// IsMining returns whether or not the CPU miner has been started and is -// therefore currenting mining. -// -// This function is safe for concurrent access. -func (m *CPUMiner) IsMining() bool { - m.Lock() - defer m.Unlock() - - return m.started -} - -// SetNumWorkers sets the number of workers to create which solve blocks. Any -// negative values will cause a default number of workers to be used which is -// based on the number of processor cores in the system. A value of 0 will -// cause all CPU mining to be stopped. -// -// This function is safe for concurrent access. -func (m *CPUMiner) SetNumWorkers(numWorkers int32) { - if numWorkers == 0 { - m.Stop() - } - - // Don't lock until after the first check since Stop does its own - // locking. - m.Lock() - defer m.Unlock() - - // Use default if provided value is negative. - if numWorkers < 0 { - m.numWorkers = defaultNumWorkers - } else { - m.numWorkers = uint64(numWorkers) - } - - // When the miner is already running, notify the controller about the - // the change. - if m.started { - m.updateNumWorkers <- struct{}{} - } -} - -// NumWorkers returns the number of workers which are running to solve blocks. -// -// This function is safe for concurrent access. -func (m *CPUMiner) NumWorkers() int32 { - m.Lock() - defer m.Unlock() - - return int32(m.numWorkers) -} - -// NewCPUMiner returns a new instance of a CPU miner for the provided configuration. -// Use Start to begin the mining process. See the documentation for CPUMiner -// type for more details. -func NewCPUMiner(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *CPUMiner { - return &CPUMiner{ - chain: c, - accountManager: accountManager, - txPool: txPool, - numWorkers: defaultNumWorkers, - updateNumWorkers: make(chan struct{}), - eventDispatcher: dispatcher, - } -} diff --git a/node/node.go b/node/node.go index 628fab24..4d32a592 100644 --- a/node/node.go +++ b/node/node.go @@ -26,7 +26,7 @@ import ( dbm "github.com/vapor/database/leveldb" "github.com/vapor/env" "github.com/vapor/event" - "github.com/vapor/mining/cpuminer" + "github.com/vapor/proposal/blockproposer" "github.com/vapor/net/websocket" "github.com/vapor/netsync" "github.com/vapor/protocol" @@ -52,7 +52,7 @@ type Node struct { api *api.API chain *protocol.Chain txfeed *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner + cpuMiner *blockproposer.BlockProposer miningEnable bool } @@ -151,7 +151,7 @@ func NewNode(config *cfg.Config) *Node { notificationMgr: notificationMgr, } - node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, dispatcher) + node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) return node } diff --git a/proposal/blockproposer/blockproposer.go b/proposal/blockproposer/blockproposer.go new file mode 100644 index 00000000..e6ec2b9a --- /dev/null +++ b/proposal/blockproposer/blockproposer.go @@ -0,0 +1,258 @@ +package blockproposer + +import ( + "sync" + "time" + "encoding/hex" + + log "github.com/sirupsen/logrus" + + "github.com/vapor/account" + "github.com/vapor/event" + "github.com/vapor/proposal" + "github.com/vapor/protocol" +) + +const ( + defaultNumWorkers = 1 + logModule = "blockproposer" +) + +// BlockProposer propose several block in specified time range +type BlockProposer struct { + sync.Mutex + chain *protocol.Chain + accountManager *account.Manager + txPool *protocol.TxPool + numWorkers uint64 + started bool + discreteMining bool + workerWg sync.WaitGroup + updateNumWorkers chan struct{} + quit chan struct{} + eventDispatcher *event.Dispatcher +} + +// generateBlocks is a worker that is controlled by the proposeWorkerController. +// It is self contained in that it creates block templates and attempts to solve +// them while detecting when it is performing stale work and reacting +// accordingly by generating a new block template. When a block is verified, it +// is submitted. +// +// It must be run as a goroutine. +func (b *BlockProposer) generateBlocks(quit chan struct{}) { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() +out: + for { + select { + case <-quit: + break out + case <-ticker.C: + } + + bestBlockHeader := b.chain.BestBlockHeader() + var pubKey []byte + timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey, bestBlockHeader.Timestamp, bestBlockHeader.Height) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey)}).Debug("fail on get next leader time range") + continue + } + + now := uint64(time.Now().UnixNano() / 1e6) + if timeStart < now { + timeStart = now + } + + time.Sleep(time.Millisecond * time.Duration(timeStart - now)) + + count := 0 + for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) { + block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager) + if err != nil { + log.Errorf("failed on create NewBlockTemplate: %v", err) + } else { + if isOrphan, err := b.chain.ProcessBlock(block); err == nil { + log.WithFields(log.Fields{ + "module": logModule, + "height": block.BlockHeader.Height, + "isOrphan": isOrphan, + "tx": len(block.Transactions), + }).Info("Proposer processed block") + + // Broadcast the block and announce chain insertion event + if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil { + log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block") + } + count++ + } else { + log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock") + } + } + } + } + + b.workerWg.Done() +} + +// proposeWorkerController launches the worker goroutines that are used to +// generate block templates. It also provides the ability to +// dynamically adjust the number of running worker goroutines. +// +// It must be run as a goroutine. +func (b *BlockProposer) proposeWorkerController() { + // launchWorkers groups common code to launch a specified number of + // workers for generating blocks. + var runningWorkers []chan struct{} + launchWorkers := func(numWorkers uint64) { + for i := uint64(0); i < numWorkers; i++ { + quit := make(chan struct{}) + runningWorkers = append(runningWorkers, quit) + + b.workerWg.Add(1) + go b.generateBlocks(quit) + } + } + + // Launch the current number of workers by default. + runningWorkers = make([]chan struct{}, 0, b.numWorkers) + launchWorkers(b.numWorkers) + +out: + for { + select { + // Update the number of running workers. + case <-b.updateNumWorkers: + // No change. + numRunning := uint64(len(runningWorkers)) + if b.numWorkers == numRunning { + continue + } + + // Add new workers. + if b.numWorkers > numRunning { + launchWorkers(b.numWorkers - numRunning) + continue + } + + // Signal the most recently created goroutines to exit. + for i := numRunning - 1; i >= b.numWorkers; i-- { + close(runningWorkers[i]) + runningWorkers[i] = nil + runningWorkers = runningWorkers[:i] + } + + case <-b.quit: + for _, quit := range runningWorkers { + close(quit) + } + break out + } + } + + b.workerWg.Wait() +} + +// Start begins the block propose process as well as the speed monitor used to +// track hashing metrics. Calling this function when the block proposer has +// already been started will have no effect. +// +// This function is safe for concurrent access. +func (b *BlockProposer) Start() { + b.Lock() + defer b.Unlock() + + // Nothing to do if the miner is already running + if b.started { + return + } + + b.quit = make(chan struct{}) + go b.proposeWorkerController() + + b.started = true + log.Infof("block proposer started") +} + +// Stop gracefully stops the proposal process by signalling all workers, and the +// speed monitor to quit. Calling this function when the block proposer has not +// already been started will have no effect. +// +// This function is safe for concurrent access. +func (b *BlockProposer) Stop() { + b.Lock() + defer b.Unlock() + + // Nothing to do if the miner is not currently running + if !b.started { + return + } + + close(b.quit) + b.started = false + log.Info("block proposer stopped") +} + +// IsProposing returns whether the block proposer has been started. +// +// This function is safe for concurrent access. +func (b *BlockProposer) IsProposing() bool { + b.Lock() + defer b.Unlock() + + return b.started +} + +// SetNumWorkers sets the number of workers to create which solve blocks. Any +// negative values will cause a default number of workers to be used which is +// based on the number of processor cores in the system. A value of 0 will +// cause all block proposer to be stopped. +// +// This function is safe for concurrent access. +func (b *BlockProposer) SetNumWorkers(numWorkers int32) { + if numWorkers == 0 { + b.Stop() + } + + // Don't lock until after the first check since Stop does its own + // locking. + b.Lock() + defer b.Unlock() + + // Use default if provided value is negative. + if numWorkers < 0 { + b.numWorkers = defaultNumWorkers + } else { + b.numWorkers = uint64(numWorkers) + } + + // When the proposer is already running, notify the controller about the + // the change. + if b.started { + b.updateNumWorkers <- struct{}{} + } +} + +// NumWorkers returns the number of workers which are running to solve blocks. +// +// This function is safe for concurrent access. +func (b *BlockProposer) NumWorkers() int32 { + b.Lock() + defer b.Unlock() + + return int32(b.numWorkers) +} + +// NewBlockProposer returns a new instance of a block proposer for the provided configuration. +// Use Start to begin the proposal process. See the documentation for BlockProposer +// type for more details. +func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer { + return &BlockProposer{ + chain: c, + accountManager: accountManager, + txPool: txPool, + numWorkers: defaultNumWorkers, + updateNumWorkers: make(chan struct{}), + eventDispatcher: dispatcher, + } +} diff --git a/mining/mining.go b/proposal/proposal.go similarity index 98% rename from mining/mining.go rename to proposal/proposal.go index 82149b16..364d9ffc 100644 --- a/mining/mining.go +++ b/proposal/proposal.go @@ -1,4 +1,4 @@ -package mining +package proposal import ( "sort" @@ -152,6 +152,8 @@ func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager } b.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(txStatus.VerifyStatus) + + _, err = c.GetBBFT().SignBlock(b) return b, err } diff --git a/mining/mining_test.go b/proposal/proposal_test.go similarity index 98% rename from mining/mining_test.go rename to proposal/proposal_test.go index dcfdfc98..fde4624e 100644 --- a/mining/mining_test.go +++ b/proposal/proposal_test.go @@ -1,4 +1,4 @@ -package mining +package proposal import "testing" diff --git a/mining/sort.go b/proposal/sort.go similarity index 93% rename from mining/sort.go rename to proposal/sort.go index 77954077..45e80e23 100644 --- a/mining/sort.go +++ b/proposal/sort.go @@ -1,4 +1,4 @@ -package mining +package proposal import "github.com/vapor/protocol" diff --git a/protocol/bbft.go b/protocol/bbft.go index aed4438f..ac6b4345 100644 --- a/protocol/bbft.go +++ b/protocol/bbft.go @@ -3,7 +3,6 @@ package protocol import ( "encoding/hex" "fmt" - "time" "github.com/golang/groupcache/lru" log "github.com/sirupsen/logrus" @@ -63,8 +62,8 @@ func (b *bbft) isIrreversible(block *types.Block) bool { } // NextLeaderTime returns the start time of the specified public key as the next leader node -func (b *bbft) NextLeaderTime(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (*time.Time, error) { - return b.consensusNodeManager.nextLeaderTime(pubkey, bestBlockTimestamp, bestBlockHeight) +func (b *bbft) NextLeaderTimeRange(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (uint64, uint64, error) { + return b.consensusNodeManager.nextLeaderTimeRange(pubkey, bestBlockTimestamp, bestBlockHeight) } func (b *bbft) ApplyBlock(voteResultMap map[uint64]*state.VoteResult, block *types.Block) (err error) { diff --git a/protocol/bbft_test.go b/protocol/bbft_test.go index 7af6f5b9..2a1c98c7 100644 --- a/protocol/bbft_test.go +++ b/protocol/bbft_test.go @@ -12,7 +12,7 @@ func TestNextLeaderTime(t *testing.T) { now uint64 nodeOrder uint64 wantError error - wantNextLeaderTime int64 + wantNextLeaderTime uint64 }{ { desc: "normal case", @@ -39,7 +39,7 @@ func TestNextLeaderTime(t *testing.T) { now: 1557906284061, nodeOrder: 1, wantError: nil, - wantNextLeaderTime: 1557906284061 + blockNumEachNode*blockTimeInterval, + wantNextLeaderTime: 1557906284061 + BlockNumEachNode*BlockTimeInterval, }, { desc: "has no chance product block in this round of voting", @@ -66,7 +66,7 @@ func TestNextLeaderTime(t *testing.T) { now: 1557906317561, nodeOrder: 1, wantError: nil, - wantNextLeaderTime: 1557906284061 + 66*blockTimeInterval, + wantNextLeaderTime: 1557906284061 + 66*BlockTimeInterval, }, { desc: "first round, must exclude genesis block", @@ -75,12 +75,12 @@ func TestNextLeaderTime(t *testing.T) { now: 1557906286561, nodeOrder: 3, wantError: nil, - wantNextLeaderTime: 1557906284061 + 9*blockTimeInterval, + wantNextLeaderTime: 1557906284061 + 9*BlockTimeInterval, }, } for i, c := range cases { - nextLeaderTime, err := nextLeaderTimeHelper(c.startTime, c.endTime, c.now, c.nodeOrder) + nextLeaderTimestamp, err := nextLeaderTimeHelper(c.startTime, c.endTime, c.now, c.nodeOrder) if err != c.wantError { t.Fatalf("case #%d (%s) want error:%v, got error:%v", i, c.desc, c.wantError, err) } @@ -88,7 +88,6 @@ func TestNextLeaderTime(t *testing.T) { if err != nil { continue } - nextLeaderTimestamp := nextLeaderTime.UnixNano() / 1e6 if nextLeaderTimestamp != c.wantNextLeaderTime { t.Errorf("case #%d (%s) want next leader time:%d, got next leader time:%d", i, c.desc, c.wantNextLeaderTime, nextLeaderTimestamp) } diff --git a/protocol/block.go b/protocol/block.go index a89047cf..8ae535a0 100644 --- a/protocol/block.go +++ b/protocol/block.go @@ -193,7 +193,7 @@ func (c *Chain) saveBlock(block *types.Block) error { } if len(signature) != 0 { - if err := c.txPool.eventDispatcher.Post(event.BlockSignatureEvent{BlockHash: block.Hash(), Signature: signature}); err != nil { + if err := c.bbft.eventDispatcher.Post(event.BlockSignatureEvent{BlockHash: block.Hash(), Signature: signature}); err != nil { return err } } diff --git a/protocol/consensus_node_manager.go b/protocol/consensus_node_manager.go index d116eb0e..be5e6bb9 100644 --- a/protocol/consensus_node_manager.go +++ b/protocol/consensus_node_manager.go @@ -2,7 +2,6 @@ package protocol import ( "encoding/hex" - "fmt" "sort" "sync" "time" @@ -15,15 +14,16 @@ const ( numOfConsensusNode = 21 roundVoteBlockNums = 1000 - // product one block per 500 milliseconds - blockTimeInterval = 500 - blockNumEachNode = 3 + // BlockTimeInterval indicate product one block per 500 milliseconds + BlockTimeInterval = 500 + BlockNumEachNode = 3 ) var ( - errHasNoChanceProductBlock = errors.New("the node has no chance to product a block in this round of voting") - errNotFoundConsensusNode = errors.New("can not found consensus node") - errVoteResultIsNotfinalized = errors.New("vote result is not finalized") + errHasNoChanceProductBlock = errors.New("the node has no chance to product a block in this round of voting") + errNotFoundConsensusNode = errors.New("can not found consensus node") + errVoteResultIsNotfinalized = errors.New("vote result is not finalized") + errPublicKeyIsNotConsensusNode = errors.New("public key is not consensus node") ) type consensusNode struct { @@ -81,7 +81,7 @@ func (c *consensusNodeManager) getConsensusNode(height uint64, pubkey string) (* func (c *consensusNodeManager) isBlocker(height uint64, blockTimestamp uint64, pubkey string) (bool, error) { prevVoteRoundLastBlock := c.blockIndex.NodeByHeight(height - 1) - startTimestamp := prevVoteRoundLastBlock.Timestamp + blockTimeInterval + startTimestamp := prevVoteRoundLastBlock.Timestamp + BlockTimeInterval consensusNodeMap, err := c.getConsensusNodesByVoteResult(height) if err != nil { @@ -94,45 +94,44 @@ func (c *consensusNodeManager) isBlocker(height uint64, blockTimestamp uint64, p } begin := getLastBlockTimeInTimeRange(startTimestamp, blockTimestamp, blockerNode.order) - end := begin + blockNumEachNode*blockTimeInterval + end := begin + BlockNumEachNode*BlockTimeInterval return blockTimestamp >= begin && blockTimestamp < end, nil } -func (c *consensusNodeManager) nextLeaderTime(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (*time.Time, error) { +func (c *consensusNodeManager) nextLeaderTimeRange(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (uint64, uint64, error) { defer c.RUnlock() c.RLock() startHeight := c.effectiveStartHeight prevRoundLastBlock := c.blockIndex.NodeByHeight(startHeight - 1) - startTime := prevRoundLastBlock.Timestamp + blockTimeInterval - endTime := bestBlockTimestamp + (roundVoteBlockNums-bestBlockHeight%roundVoteBlockNums)*blockTimeInterval + startTime := prevRoundLastBlock.Timestamp + BlockTimeInterval + endTime := bestBlockTimestamp + (roundVoteBlockNums-bestBlockHeight%roundVoteBlockNums)*BlockTimeInterval consensusNode, exist := c.consensusNodeMap[hex.EncodeToString(pubkey)] if !exist { - return nil, fmt.Errorf("pubkey:%s is not consensus node", hex.EncodeToString(pubkey)) + return 0, 0, errPublicKeyIsNotConsensusNode } nextLeaderTime, err := nextLeaderTimeHelper(startTime, endTime, uint64(time.Now().UnixNano()/1e6), consensusNode.order) if err != nil { - return nil, err + return 0, 0, err } - return nextLeaderTime, nil + return nextLeaderTime, nextLeaderTime + BlockNumEachNode*BlockTimeInterval, nil } -func nextLeaderTimeHelper(startTime, endTime, now, nodeOrder uint64) (*time.Time, error) { +func nextLeaderTimeHelper(startTime, endTime, now, nodeOrder uint64) (uint64, error) { nextLeaderTimestamp := getLastBlockTimeInTimeRange(startTime, now, nodeOrder) - roundBlockTime := uint64(blockNumEachNode * numOfConsensusNode * blockTimeInterval) + roundBlockTime := uint64(BlockNumEachNode * numOfConsensusNode * BlockTimeInterval) - if int64(now-nextLeaderTimestamp) >= blockNumEachNode*blockTimeInterval { + if int64(now-nextLeaderTimestamp) >= BlockNumEachNode*BlockTimeInterval { nextLeaderTimestamp += roundBlockTime if nextLeaderTimestamp >= endTime { - return nil, errHasNoChanceProductBlock + return 0, errHasNoChanceProductBlock } } - nextLeaderTime := time.Unix(int64(nextLeaderTimestamp)/1000, (int64(nextLeaderTimestamp)%1000)*1e6) - return &nextLeaderTime, nil + return nextLeaderTimestamp, nil } // updateConsensusNodes used to update consensus node after each round of voting @@ -156,11 +155,11 @@ func (c *consensusNodeManager) updateConsensusNodes(bestBlockHeight uint64) erro func getLastBlockTimeInTimeRange(startTimestamp, endTimestamp, order uint64) uint64 { // One round of product block time for all consensus nodes - roundBlockTime := uint64(blockNumEachNode * numOfConsensusNode * blockTimeInterval) + roundBlockTime := uint64(BlockNumEachNode * numOfConsensusNode * BlockTimeInterval) // The start time of the last round of product block lastRoundStartTime := startTimestamp + (endTimestamp-startTimestamp)/roundBlockTime*roundBlockTime // The time of product block of the consensus in last round - return lastRoundStartTime + order*(blockNumEachNode*blockTimeInterval) + return lastRoundStartTime + order*(BlockNumEachNode*BlockTimeInterval) } func (c *consensusNodeManager) getConsensusNodesByVoteResult(blockHeight uint64) (map[string]*consensusNode, error) { diff --git a/protocol/protocol.go b/protocol/protocol.go index f3f4fc26..57547d7b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -150,3 +150,8 @@ func (c *Chain) BlockWaiter(height uint64) <-chan struct{} { func (c *Chain) GetTxPool() *TxPool { return c.txPool } + +// GetBBFT return chain bbft +func (c *Chain) GetBBFT() *bbft { + return c.bbft +} diff --git a/test/bench_blockchain_test.go b/test/bench_blockchain_test.go index e691987e..f5f7783a 100644 --- a/test/bench_blockchain_test.go +++ b/test/bench_blockchain_test.go @@ -18,7 +18,7 @@ import ( dbm "github.com/vapor/database/leveldb" "github.com/vapor/database/storage" "github.com/vapor/event" - "github.com/vapor/mining" + "github.com/vapor/proposal" "github.com/vapor/protocol" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" @@ -159,7 +159,7 @@ func InsertChain(chain *protocol.Chain, txPool *protocol.TxPool, txs []*types.Tx } } - block, err := mining.NewBlockTemplate(chain, txPool, nil) + block, err := proposal.NewBlockTemplate(chain, txPool, nil) if err != nil { return err } diff --git a/test/performance/mining_test.go b/test/performance/mining_test.go index bd26fcd4..c7a606e0 100644 --- a/test/performance/mining_test.go +++ b/test/performance/mining_test.go @@ -6,7 +6,7 @@ import ( "github.com/vapor/account" dbm "github.com/vapor/database/leveldb" - "github.com/vapor/mining" + "github.com/vapor/proposal" "github.com/vapor/test" ) @@ -23,6 +23,6 @@ func BenchmarkNewBlockTpl(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - mining.NewBlockTemplate(chain, txPool, accountManager) + proposal.NewBlockTemplate(chain, txPool, accountManager) } } -- 2.11.0