From bac6f0e92975d97dd31952b1aec38543052c88c9 Mon Sep 17 00:00:00 2001 From: gguoss <1536310027@qq.com> Date: Fri, 21 Jul 2017 15:25:59 +0800 Subject: [PATCH] Added memstore to store block. Used to test block sync. --- blockchain/memstore.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++ blockchain/pool.go | 50 ++++++++++++++++--------------- blockchain/reactor.go | 19 ++++++------ node/node.go | 9 +++--- 4 files changed, 120 insertions(+), 38 deletions(-) create mode 100644 blockchain/memstore.go diff --git a/blockchain/memstore.go b/blockchain/memstore.go new file mode 100644 index 00000000..d7df1677 --- /dev/null +++ b/blockchain/memstore.go @@ -0,0 +1,80 @@ +// MemStore is a Store implementation that +// keeps all blockchain state in memory. +// +// It is used in tests to avoid needing a database. +package blockchain + +import ( + "fmt" + "sync" + + "github.com/blockchain/protocol/bc/legacy" +// "github.com/blockchain/protocol/state" +) + +// MemStore satisfies the Store interface. +type MemStore struct { + mu sync.Mutex + Blocks map[uint64]*legacy.Block +// State *state.Snapshot +// StateHeight uint64 +} + +// New returns a new MemStore +func New() *MemStore { + return &MemStore{Blocks: make(map[uint64]*legacy.Block)} +} + +func (m *MemStore) Height() uint64 { + m.mu.Lock() + defer m.mu.Unlock() + + return uint64(len(m.Blocks)) +} + +func (m *MemStore) SaveBlock(b *legacy.Block) error { + m.mu.Lock() + defer m.mu.Unlock() + + existing, ok := m.Blocks[b.Height] + if ok && existing.Hash() != b.Hash() { + return fmt.Errorf("already have a block at height %d", b.Height) + } + m.Blocks[b.Height] = b + return nil +} + +/* +func (m *MemStore) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.State = state.Copy(snapshot) + m.StateHeight = height + return nil +} +*/ + +func (m *MemStore) LoadBlock(height uint64) *legacy.Block { + m.mu.Lock() + defer m.mu.Unlock() + b, ok := m.Blocks[height] + if !ok { + return nil + } + return b +} + +/* +func (m *MemStore) LatestSnapshot(context.Context) (*state.Snapshot, uint64, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.State == nil { + m.State = state.Empty() + } + return state.Copy(m.State), m.StateHeight, nil +} +*/ + +func (m *MemStore) FinalizeBlock(uint64) error { return nil } diff --git a/blockchain/pool.go b/blockchain/pool.go index 6b51b680..6c3d8700 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -5,7 +5,8 @@ import ( "sync" "time" - "github.com/blockchain/types" +// "github.com/blockchain/types" + "github.com/blockchain/protocol/bc/legacy" . "github.com/tendermint/tmlibs/common" flow "github.com/tendermint/tmlibs/flowrate" "github.com/tendermint/tmlibs/log" @@ -38,8 +39,8 @@ type BlockPool struct { mtx sync.Mutex // block requests - requesters map[int]*bpRequester - height int // the lowest key in requesters. + requesters map[uint64]*bpRequester + height uint64 // the lowest key in requesters. numPending int32 // number of requests pending assignment or block response // peers peers map[string]*bpPeer @@ -48,11 +49,11 @@ type BlockPool struct { timeoutsCh chan<- string } -func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { +func NewBlockPool(start uint64, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { bp := &BlockPool{ peers: make(map[string]*bpPeer), - requesters: make(map[int]*bpRequester), + requesters: make(map[uint64]*bpRequester), height: start, numPending: 0, @@ -117,7 +118,7 @@ func (pool *BlockPool) removeTimedoutPeers() { } } -func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { +func (pool *BlockPool) GetStatus() (height uint64, numPending int32, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -137,9 +138,9 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - maxPeerHeight := 0 + var maxPeerHeight uint64 = 0 for _, peer := range pool.peers { - maxPeerHeight = MaxInt(maxPeerHeight, peer.height) + maxPeerHeight = MaxUint64(maxPeerHeight, peer.height) } isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight) @@ -150,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool { // We need to see the second block's Commit to validate the first block. // So we peek two blocks at a time. // The caller will verify the commit. -func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { +func (pool *BlockPool) PeekTwoBlocks() (first *legacy.Block, second *legacy.Block) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -185,7 +186,7 @@ func (pool *BlockPool) PopRequest() { // Invalidates the block at pool.height, // Remove the peer and redo request from others. -func (pool *BlockPool) RedoRequest(height int) { +func (pool *BlockPool) RedoRequest(height uint64) { pool.mtx.Lock() request := pool.requesters[height] pool.mtx.Unlock() @@ -199,7 +200,7 @@ func (pool *BlockPool) RedoRequest(height int) { } // TODO: ensure that blocks come in order for each peer. -func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { +func (pool *BlockPool) AddBlock(peerID string, block *legacy.Block, blockSize int) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -218,7 +219,7 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int } // Sets the peer's alleged blockchain height. -func (pool *BlockPool) SetPeerHeight(peerID string, height int) { +func (pool *BlockPool) SetPeerHeight(peerID string, height uint64) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -253,7 +254,7 @@ func (pool *BlockPool) removePeer(peerID string) { // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint64) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -279,7 +280,7 @@ func (pool *BlockPool) makeNextRequester() { pool.mtx.Lock() defer pool.mtx.Unlock() - nextHeight := pool.height + len(pool.requesters) + nextHeight := pool.height + uint64(len(pool.requesters)) request := newBPRequester(pool, nextHeight) request.SetLogger(pool.Logger.With("height", nextHeight)) @@ -289,7 +290,7 @@ func (pool *BlockPool) makeNextRequester() { request.Start() } -func (pool *BlockPool) sendRequest(height int, peerID string) { +func (pool *BlockPool) sendRequest(height uint64, peerID string) { if !pool.IsRunning() { return } @@ -308,7 +309,8 @@ func (pool *BlockPool) debug() string { defer pool.mtx.Unlock() str := "" - for h := pool.height; h < pool.height+len(pool.requesters); h++ { + var h uint64 + for h = pool.height; h < pool.height+ uint64(len(pool.requesters)); h++ { if pool.requesters[h] == nil { str += Fmt("H(%v):X ", h) } else { @@ -327,7 +329,7 @@ type bpPeer struct { recvMonitor *flow.Monitor mtx sync.Mutex - height int + height uint64 numPending int32 timeout *time.Timer didTimeout bool @@ -335,7 +337,7 @@ type bpPeer struct { logger log.Logger } -func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { +func newBPPeer(pool *BlockPool, peerID string, height uint64) *bpPeer { peer := &bpPeer{ pool: pool, id: peerID, @@ -396,16 +398,16 @@ func (peer *bpPeer) onTimeout() { type bpRequester struct { BaseService pool *BlockPool - height int + height uint64 gotBlockCh chan struct{} redoCh chan struct{} mtx sync.Mutex peerID string - block *types.Block + block *legacy.Block } -func newBPRequester(pool *BlockPool, height int) *bpRequester { +func newBPRequester(pool *BlockPool, height uint64) *bpRequester { bpr := &bpRequester{ pool: pool, height: height, @@ -425,7 +427,7 @@ func (bpr *bpRequester) OnStart() error { } // Returns true if the peer matches -func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool { +func (bpr *bpRequester) setBlock(block *legacy.Block, peerID string) bool { bpr.mtx.Lock() if bpr.block != nil || bpr.peerID != peerID { bpr.mtx.Unlock() @@ -438,7 +440,7 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool { return true } -func (bpr *bpRequester) getBlock() *types.Block { +func (bpr *bpRequester) getBlock() *legacy.Block { bpr.mtx.Lock() defer bpr.mtx.Unlock() return bpr.block @@ -517,6 +519,6 @@ OUTER_LOOP: //------------------------------------- type BlockRequest struct { - Height int + Height uint64 PeerID string } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 2711036d..cdd335ee 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -9,8 +9,8 @@ import ( wire "github.com/tendermint/go-wire" "github.com/blockchain/p2p" "github.com/blockchain/types" + "github.com/blockchain/protocol/bc/legacy" cmn "github.com/tendermint/tmlibs/common" - // "github.com/blockchain/protocol" ) const ( @@ -45,7 +45,7 @@ type BlockchainReactor struct { // state *sm.State // proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn - store *BlockStore + store *MemStore pool *BlockPool fastSync bool requestsCh chan BlockRequest @@ -55,7 +55,7 @@ type BlockchainReactor struct { evsw types.EventSwitch } -func NewBlockchainReactor(store *BlockStore, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(store *MemStore, fastSync bool) *BlockchainReactor { requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -66,6 +66,7 @@ func NewBlockchainReactor(store *BlockStore, fastSync bool) *BlockchainReactor { bcR := &BlockchainReactor { fastSync: fastSync, pool: pool, + store: store, requestsCh: requestsCh, timeoutsCh: timeoutsCh, } @@ -213,10 +214,8 @@ FOR_LOOP: // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet(types.DefaultBlockPartSize) - //firstPartsHeader := firstParts.Header() bcR.pool.PopRequest() - bcR.store.SaveBlock(first, firstParts, second.LastCommit) + bcR.store.SaveBlock(first) } continue FOR_LOOP case <-bcR.Quit: @@ -274,7 +273,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { //------------------------------------- type bcBlockRequestMessage struct { - Height int + Height uint64 } func (m *bcBlockRequestMessage) String() string { @@ -285,7 +284,7 @@ func (m *bcBlockRequestMessage) String() string { // NOTE: keep up-to-date with maxBlockchainResponseSize type bcBlockResponseMessage struct { - Block *types.Block + Block *legacy.Block } func (m *bcBlockResponseMessage) String() string { @@ -295,7 +294,7 @@ func (m *bcBlockResponseMessage) String() string { //------------------------------------- type bcStatusRequestMessage struct { - Height int + Height uint64 } func (m *bcStatusRequestMessage) String() string { @@ -305,7 +304,7 @@ func (m *bcStatusRequestMessage) String() string { //------------------------------------- type bcStatusResponseMessage struct { - Height int + Height uint64 } func (m *bcStatusResponseMessage) String() string { diff --git a/node/node.go b/node/node.go index e5cfcf69..9ca9c0c1 100644 --- a/node/node.go +++ b/node/node.go @@ -13,7 +13,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" bc "github.com/blockchain/blockchain" - dbm "github.com/tendermint/tmlibs/db" + //dbm "github.com/tendermint/tmlibs/db" _ "net/http/pprof" ) @@ -32,7 +32,7 @@ type Node struct { // services evsw types.EventSwitch // pub/sub for services - blockStore *bc.BlockStore + blockStore *bc.MemStore bcReactor *bc.BlockchainReactor } @@ -44,8 +44,9 @@ func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node { func NewNode(config *cfg.Config, privValidator *types.PrivValidator, logger log.Logger) *Node { // Get BlockStore - blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir()) - blockStore := bc.NewBlockStore(blockStoreDB) + //blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir()) + //blockStore := bc.NewBlockStore(blockStoreDB) + blockStore := bc.New() // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() -- 2.11.0