OSDN Git Service

Added memstore to store block. Used to test block sync.
authorgguoss <1536310027@qq.com>
Fri, 21 Jul 2017 07:25:59 +0000 (15:25 +0800)
committergguoss <1536310027@qq.com>
Fri, 21 Jul 2017 07:25:59 +0000 (15:25 +0800)
blockchain/memstore.go [new file with mode: 0644]
blockchain/pool.go
blockchain/reactor.go
node/node.go

diff --git a/blockchain/memstore.go b/blockchain/memstore.go
new file mode 100644 (file)
index 0000000..d7df167
--- /dev/null
@@ -0,0 +1,80 @@
+// MemStore is a Store implementation that
+// keeps all blockchain state in memory.
+//
+// It is used in tests to avoid needing a database.
+package blockchain
+
+import (
+       "fmt"
+       "sync"
+
+       "github.com/blockchain/protocol/bc/legacy"
+//     "github.com/blockchain/protocol/state"
+)
+
+// MemStore satisfies the Store interface.
+type MemStore struct {
+       mu          sync.Mutex
+       Blocks      map[uint64]*legacy.Block
+//     State       *state.Snapshot
+//     StateHeight uint64
+}
+
+// New returns a new MemStore
+func New() *MemStore {
+       return &MemStore{Blocks: make(map[uint64]*legacy.Block)}
+}
+
+func (m *MemStore) Height() uint64 {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       return uint64(len(m.Blocks))
+}
+
+func (m *MemStore) SaveBlock(b *legacy.Block) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       existing, ok := m.Blocks[b.Height]
+       if ok && existing.Hash() != b.Hash() {
+               return fmt.Errorf("already have a block at height %d", b.Height)
+       }
+       m.Blocks[b.Height] = b
+       return nil
+}
+
+/*
+func (m *MemStore) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       m.State = state.Copy(snapshot)
+       m.StateHeight = height
+       return nil
+}
+*/
+
+func (m *MemStore) LoadBlock(height uint64) *legacy.Block {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       b, ok := m.Blocks[height]
+       if !ok {
+               return nil
+       }
+       return b
+}
+
+/*
+func (m *MemStore) LatestSnapshot(context.Context) (*state.Snapshot, uint64, error) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.State == nil {
+               m.State = state.Empty()
+       }
+       return state.Copy(m.State), m.StateHeight, nil
+}
+*/
+
+func (m *MemStore) FinalizeBlock(uint64) error { return nil }
index 6b51b68..6c3d870 100644 (file)
@@ -5,7 +5,8 @@ import (
        "sync"
        "time"
 
-       "github.com/blockchain/types"
+//     "github.com/blockchain/types"
+    "github.com/blockchain/protocol/bc/legacy"
        . "github.com/tendermint/tmlibs/common"
        flow "github.com/tendermint/tmlibs/flowrate"
        "github.com/tendermint/tmlibs/log"
@@ -38,8 +39,8 @@ type BlockPool struct {
 
        mtx sync.Mutex
        // block requests
-       requesters map[int]*bpRequester
-       height     int   // the lowest key in requesters.
+       requesters map[uint64]*bpRequester
+       height     uint64   // the lowest key in requesters.
        numPending int32 // number of requests pending assignment or block response
        // peers
        peers map[string]*bpPeer
@@ -48,11 +49,11 @@ type BlockPool struct {
        timeoutsCh chan<- string
 }
 
-func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
+func NewBlockPool(start uint64, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
        bp := &BlockPool{
                peers: make(map[string]*bpPeer),
 
-               requesters: make(map[int]*bpRequester),
+               requesters: make(map[uint64]*bpRequester),
                height:     start,
                numPending: 0,
 
@@ -117,7 +118,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
        }
 }
 
-func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
+func (pool *BlockPool) GetStatus() (height uint64, numPending int32, lenRequesters int) {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
@@ -137,9 +138,9 @@ func (pool *BlockPool) IsCaughtUp() bool {
                return false
        }
 
-       maxPeerHeight := 0
+       var maxPeerHeight uint64 = 0
        for _, peer := range pool.peers {
-               maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
+               maxPeerHeight = MaxUint64(maxPeerHeight, peer.height)
        }
 
        isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
@@ -150,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
 // We need to see the second block's Commit to validate the first block.
 // So we peek two blocks at a time.
 // The caller will verify the commit.
-func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
+func (pool *BlockPool) PeekTwoBlocks() (first *legacy.Block, second *legacy.Block) {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
@@ -185,7 +186,7 @@ func (pool *BlockPool) PopRequest() {
 
 // Invalidates the block at pool.height,
 // Remove the peer and redo request from others.
-func (pool *BlockPool) RedoRequest(height int) {
+func (pool *BlockPool) RedoRequest(height uint64) {
        pool.mtx.Lock()
        request := pool.requesters[height]
        pool.mtx.Unlock()
@@ -199,7 +200,7 @@ func (pool *BlockPool) RedoRequest(height int) {
 }
 
 // TODO: ensure that blocks come in order for each peer.
-func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
+func (pool *BlockPool) AddBlock(peerID string, block *legacy.Block, blockSize int) {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
@@ -218,7 +219,7 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
 }
 
 // Sets the peer's alleged blockchain height.
-func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
+func (pool *BlockPool) SetPeerHeight(peerID string, height uint64) {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
@@ -253,7 +254,7 @@ func (pool *BlockPool) removePeer(peerID string) {
 
 // Pick an available peer with at least the given minHeight.
 // If no peers are available, returns nil.
-func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
+func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint64) *bpPeer {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
@@ -279,7 +280,7 @@ func (pool *BlockPool) makeNextRequester() {
        pool.mtx.Lock()
        defer pool.mtx.Unlock()
 
-       nextHeight := pool.height + len(pool.requesters)
+       nextHeight := pool.height + uint64(len(pool.requesters))
        request := newBPRequester(pool, nextHeight)
        request.SetLogger(pool.Logger.With("height", nextHeight))
 
@@ -289,7 +290,7 @@ func (pool *BlockPool) makeNextRequester() {
        request.Start()
 }
 
-func (pool *BlockPool) sendRequest(height int, peerID string) {
+func (pool *BlockPool) sendRequest(height uint64, peerID string) {
        if !pool.IsRunning() {
                return
        }
@@ -308,7 +309,8 @@ func (pool *BlockPool) debug() string {
        defer pool.mtx.Unlock()
 
        str := ""
-       for h := pool.height; h < pool.height+len(pool.requesters); h++ {
+    var h uint64
+       for h = pool.height; h < pool.height+ uint64(len(pool.requesters)); h++ {
                if pool.requesters[h] == nil {
                        str += Fmt("H(%v):X ", h)
                } else {
@@ -327,7 +329,7 @@ type bpPeer struct {
        recvMonitor *flow.Monitor
 
        mtx        sync.Mutex
-       height     int
+       height     uint64
        numPending int32
        timeout    *time.Timer
        didTimeout bool
@@ -335,7 +337,7 @@ type bpPeer struct {
        logger log.Logger
 }
 
-func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
+func newBPPeer(pool *BlockPool, peerID string, height uint64) *bpPeer {
        peer := &bpPeer{
                pool:       pool,
                id:         peerID,
@@ -396,16 +398,16 @@ func (peer *bpPeer) onTimeout() {
 type bpRequester struct {
        BaseService
        pool       *BlockPool
-       height     int
+       height     uint64
        gotBlockCh chan struct{}
        redoCh     chan struct{}
 
        mtx    sync.Mutex
        peerID string
-       block  *types.Block
+       block  *legacy.Block
 }
 
-func newBPRequester(pool *BlockPool, height int) *bpRequester {
+func newBPRequester(pool *BlockPool, height uint64) *bpRequester {
        bpr := &bpRequester{
                pool:       pool,
                height:     height,
@@ -425,7 +427,7 @@ func (bpr *bpRequester) OnStart() error {
 }
 
 // Returns true if the peer matches
-func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
+func (bpr *bpRequester) setBlock(block *legacy.Block, peerID string) bool {
        bpr.mtx.Lock()
        if bpr.block != nil || bpr.peerID != peerID {
                bpr.mtx.Unlock()
@@ -438,7 +440,7 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
        return true
 }
 
-func (bpr *bpRequester) getBlock() *types.Block {
+func (bpr *bpRequester) getBlock() *legacy.Block {
        bpr.mtx.Lock()
        defer bpr.mtx.Unlock()
        return bpr.block
@@ -517,6 +519,6 @@ OUTER_LOOP:
 //-------------------------------------
 
 type BlockRequest struct {
-       Height int
+       Height uint64
        PeerID string
 }
index 2711036..cdd335e 100644 (file)
@@ -9,8 +9,8 @@ import (
        wire "github.com/tendermint/go-wire"
        "github.com/blockchain/p2p"
        "github.com/blockchain/types"
+    "github.com/blockchain/protocol/bc/legacy"
        cmn "github.com/tendermint/tmlibs/common"
- //   "github.com/blockchain/protocol"
 )
 
 const (
@@ -45,7 +45,7 @@ type BlockchainReactor struct {
 
 //     state        *sm.State
 //     proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
-       store        *BlockStore
+       store        *MemStore
        pool         *BlockPool
        fastSync     bool
        requestsCh   chan BlockRequest
@@ -55,7 +55,7 @@ type BlockchainReactor struct {
        evsw types.EventSwitch
 }
 
-func NewBlockchainReactor(store *BlockStore, fastSync bool) *BlockchainReactor {
+func NewBlockchainReactor(store *MemStore, fastSync bool) *BlockchainReactor {
     requestsCh    := make(chan BlockRequest, defaultChannelCapacity)
     timeoutsCh    := make(chan string, defaultChannelCapacity)
     pool := NewBlockPool(
@@ -66,6 +66,7 @@ func NewBlockchainReactor(store *BlockStore, fastSync bool) *BlockchainReactor {
     bcR := &BlockchainReactor {
         fastSync:      fastSync,
         pool:          pool,
+        store:         store,
         requestsCh:    requestsCh,
         timeoutsCh:   timeoutsCh,
     }
@@ -213,10 +214,8 @@ FOR_LOOP:
                                        // We need both to sync the first block.
                                        break SYNC_LOOP
                                }
-                               firstParts := first.MakePartSet(types.DefaultBlockPartSize)
-                               //firstPartsHeader := firstParts.Header()
                            bcR.pool.PopRequest()
-                bcR.store.SaveBlock(first, firstParts, second.LastCommit)
+                bcR.store.SaveBlock(first)
                        }
                        continue FOR_LOOP
                case <-bcR.Quit:
@@ -274,7 +273,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
 //-------------------------------------
 
 type bcBlockRequestMessage struct {
-       Height int
+       Height uint64
 }
 
 func (m *bcBlockRequestMessage) String() string {
@@ -285,7 +284,7 @@ func (m *bcBlockRequestMessage) String() string {
 
 // NOTE: keep up-to-date with maxBlockchainResponseSize
 type bcBlockResponseMessage struct {
-       Block *types.Block
+       Block *legacy.Block
 }
 
 func (m *bcBlockResponseMessage) String() string {
@@ -295,7 +294,7 @@ func (m *bcBlockResponseMessage) String() string {
 //-------------------------------------
 
 type bcStatusRequestMessage struct {
-       Height int
+       Height uint64
 }
 
 func (m *bcStatusRequestMessage) String() string {
@@ -305,7 +304,7 @@ func (m *bcStatusRequestMessage) String() string {
 //-------------------------------------
 
 type bcStatusResponseMessage struct {
-       Height int
+       Height uint64
 }
 
 func (m *bcStatusResponseMessage) String() string {
index e5cfcf6..9ca9c0c 100644 (file)
@@ -13,7 +13,7 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
        "github.com/tendermint/tmlibs/log"
     bc "github.com/blockchain/blockchain"
-    dbm "github.com/tendermint/tmlibs/db"
+    //dbm "github.com/tendermint/tmlibs/db"
 
        _ "net/http/pprof"
 )
@@ -32,7 +32,7 @@ type Node struct {
 
        // services
        evsw             types.EventSwitch           // pub/sub for services
-    blockStore       *bc.BlockStore
+    blockStore       *bc.MemStore
     bcReactor        *bc.BlockchainReactor
 }
 
@@ -44,8 +44,9 @@ func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node {
 
 func NewNode(config *cfg.Config, privValidator *types.PrivValidator, logger log.Logger) *Node {
        // Get BlockStore
-    blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
-    blockStore := bc.NewBlockStore(blockStoreDB)
+    //blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
+    //blockStore := bc.NewBlockStore(blockStoreDB)
+    blockStore := bc.New()
 
        // Generate node PrivKey
        privKey := crypto.GenPrivKeyEd25519()