--- /dev/null
+// MemStore is a Store implementation that
+// keeps all blockchain state in memory.
+//
+// It is used in tests to avoid needing a database.
+package blockchain
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/blockchain/protocol/bc/legacy"
+// "github.com/blockchain/protocol/state"
+)
+
+// MemStore satisfies the Store interface.
+type MemStore struct {
+ mu sync.Mutex
+ Blocks map[uint64]*legacy.Block
+// State *state.Snapshot
+// StateHeight uint64
+}
+
+// New returns a new MemStore
+func New() *MemStore {
+ return &MemStore{Blocks: make(map[uint64]*legacy.Block)}
+}
+
+func (m *MemStore) Height() uint64 {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ return uint64(len(m.Blocks))
+}
+
+func (m *MemStore) SaveBlock(b *legacy.Block) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ existing, ok := m.Blocks[b.Height]
+ if ok && existing.Hash() != b.Hash() {
+ return fmt.Errorf("already have a block at height %d", b.Height)
+ }
+ m.Blocks[b.Height] = b
+ return nil
+}
+
+/*
+func (m *MemStore) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ m.State = state.Copy(snapshot)
+ m.StateHeight = height
+ return nil
+}
+*/
+
+func (m *MemStore) LoadBlock(height uint64) *legacy.Block {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ b, ok := m.Blocks[height]
+ if !ok {
+ return nil
+ }
+ return b
+}
+
+/*
+func (m *MemStore) LatestSnapshot(context.Context) (*state.Snapshot, uint64, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.State == nil {
+ m.State = state.Empty()
+ }
+ return state.Copy(m.State), m.StateHeight, nil
+}
+*/
+
+func (m *MemStore) FinalizeBlock(uint64) error { return nil }
"sync"
"time"
- "github.com/blockchain/types"
+// "github.com/blockchain/types"
+ "github.com/blockchain/protocol/bc/legacy"
. "github.com/tendermint/tmlibs/common"
flow "github.com/tendermint/tmlibs/flowrate"
"github.com/tendermint/tmlibs/log"
mtx sync.Mutex
// block requests
- requesters map[int]*bpRequester
- height int // the lowest key in requesters.
+ requesters map[uint64]*bpRequester
+ height uint64 // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response
// peers
peers map[string]*bpPeer
timeoutsCh chan<- string
}
-func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
+func NewBlockPool(start uint64, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
bp := &BlockPool{
peers: make(map[string]*bpPeer),
- requesters: make(map[int]*bpRequester),
+ requesters: make(map[uint64]*bpRequester),
height: start,
numPending: 0,
}
}
-func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
+func (pool *BlockPool) GetStatus() (height uint64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return false
}
- maxPeerHeight := 0
+ var maxPeerHeight uint64 = 0
for _, peer := range pool.peers {
- maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
+ maxPeerHeight = MaxUint64(maxPeerHeight, peer.height)
}
isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// The caller will verify the commit.
-func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
+func (pool *BlockPool) PeekTwoBlocks() (first *legacy.Block, second *legacy.Block) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
// Invalidates the block at pool.height,
// Remove the peer and redo request from others.
-func (pool *BlockPool) RedoRequest(height int) {
+func (pool *BlockPool) RedoRequest(height uint64) {
pool.mtx.Lock()
request := pool.requesters[height]
pool.mtx.Unlock()
}
// TODO: ensure that blocks come in order for each peer.
-func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
+func (pool *BlockPool) AddBlock(peerID string, block *legacy.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
}
// Sets the peer's alleged blockchain height.
-func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
+func (pool *BlockPool) SetPeerHeight(peerID string, height uint64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
-func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
+func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.mtx.Lock()
defer pool.mtx.Unlock()
- nextHeight := pool.height + len(pool.requesters)
+ nextHeight := pool.height + uint64(len(pool.requesters))
request := newBPRequester(pool, nextHeight)
request.SetLogger(pool.Logger.With("height", nextHeight))
request.Start()
}
-func (pool *BlockPool) sendRequest(height int, peerID string) {
+func (pool *BlockPool) sendRequest(height uint64, peerID string) {
if !pool.IsRunning() {
return
}
defer pool.mtx.Unlock()
str := ""
- for h := pool.height; h < pool.height+len(pool.requesters); h++ {
+ var h uint64
+ for h = pool.height; h < pool.height+ uint64(len(pool.requesters)); h++ {
if pool.requesters[h] == nil {
str += Fmt("H(%v):X ", h)
} else {
recvMonitor *flow.Monitor
mtx sync.Mutex
- height int
+ height uint64
numPending int32
timeout *time.Timer
didTimeout bool
logger log.Logger
}
-func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
+func newBPPeer(pool *BlockPool, peerID string, height uint64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
type bpRequester struct {
BaseService
pool *BlockPool
- height int
+ height uint64
gotBlockCh chan struct{}
redoCh chan struct{}
mtx sync.Mutex
peerID string
- block *types.Block
+ block *legacy.Block
}
-func newBPRequester(pool *BlockPool, height int) *bpRequester {
+func newBPRequester(pool *BlockPool, height uint64) *bpRequester {
bpr := &bpRequester{
pool: pool,
height: height,
}
// Returns true if the peer matches
-func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
+func (bpr *bpRequester) setBlock(block *legacy.Block, peerID string) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return true
}
-func (bpr *bpRequester) getBlock() *types.Block {
+func (bpr *bpRequester) getBlock() *legacy.Block {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
//-------------------------------------
type BlockRequest struct {
- Height int
+ Height uint64
PeerID string
}
wire "github.com/tendermint/go-wire"
"github.com/blockchain/p2p"
"github.com/blockchain/types"
+ "github.com/blockchain/protocol/bc/legacy"
cmn "github.com/tendermint/tmlibs/common"
- // "github.com/blockchain/protocol"
)
const (
// state *sm.State
// proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
- store *BlockStore
+ store *MemStore
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
evsw types.EventSwitch
}
-func NewBlockchainReactor(store *BlockStore, fastSync bool) *BlockchainReactor {
+func NewBlockchainReactor(store *MemStore, fastSync bool) *BlockchainReactor {
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
pool := NewBlockPool(
bcR := &BlockchainReactor {
fastSync: fastSync,
pool: pool,
+ store: store,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
}
// We need both to sync the first block.
break SYNC_LOOP
}
- firstParts := first.MakePartSet(types.DefaultBlockPartSize)
- //firstPartsHeader := firstParts.Header()
bcR.pool.PopRequest()
- bcR.store.SaveBlock(first, firstParts, second.LastCommit)
+ bcR.store.SaveBlock(first)
}
continue FOR_LOOP
case <-bcR.Quit:
//-------------------------------------
type bcBlockRequestMessage struct {
- Height int
+ Height uint64
}
func (m *bcBlockRequestMessage) String() string {
// NOTE: keep up-to-date with maxBlockchainResponseSize
type bcBlockResponseMessage struct {
- Block *types.Block
+ Block *legacy.Block
}
func (m *bcBlockResponseMessage) String() string {
//-------------------------------------
type bcStatusRequestMessage struct {
- Height int
+ Height uint64
}
func (m *bcStatusRequestMessage) String() string {
//-------------------------------------
type bcStatusResponseMessage struct {
- Height int
+ Height uint64
}
func (m *bcStatusResponseMessage) String() string {
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
bc "github.com/blockchain/blockchain"
- dbm "github.com/tendermint/tmlibs/db"
+ //dbm "github.com/tendermint/tmlibs/db"
_ "net/http/pprof"
)
// services
evsw types.EventSwitch // pub/sub for services
- blockStore *bc.BlockStore
+ blockStore *bc.MemStore
bcReactor *bc.BlockchainReactor
}
func NewNode(config *cfg.Config, privValidator *types.PrivValidator, logger log.Logger) *Node {
// Get BlockStore
- blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
- blockStore := bc.NewBlockStore(blockStoreDB)
+ //blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
+ //blockStore := bc.NewBlockStore(blockStoreDB)
+ blockStore := bc.New()
// Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519()