--- /dev/null
+package blockchain
+
+import (
+ "errors"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/p2p"
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/legacy"
+)
+
+type blockKeeperPeer struct {
+ mtx sync.RWMutex
+ height uint64
+ hash *bc.Hash
+}
+
+func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
+ return &blockKeeperPeer{
+ height: height,
+ hash: hash,
+ }
+}
+
+func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+ return p.height, p.hash
+}
+
+func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.height = height
+ p.hash = hash
+}
+
+type pendingResponse struct {
+ block *legacy.Block
+ peerID string
+}
+
+//TODO: add retry mechanism
+type blockKeeper struct {
+ mtx sync.RWMutex
+ chainHeight uint64
+ maxPeerHeight uint64
+ chainUpdateCh <-chan struct{}
+ peerUpdateCh chan struct{}
+
+ chain *protocol.Chain
+ sw *p2p.Switch
+ peers map[string]*blockKeeperPeer
+ pendingProcessCh chan *pendingResponse
+}
+
+func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
+ chainHeight := chain.Height()
+ bk := &blockKeeper{
+ chainHeight: chainHeight,
+ maxPeerHeight: uint64(0),
+ chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
+ peerUpdateCh: make(chan struct{}, 1000),
+
+ chain: chain,
+ sw: sw,
+ peers: make(map[string]*blockKeeperPeer),
+ pendingProcessCh: make(chan *pendingResponse),
+ }
+ go bk.blockProcessWorker()
+ go bk.blockRequestWorker()
+ return bk
+}
+
+func (bk *blockKeeper) AddBlock(block *legacy.Block, peerID string) {
+ bk.pendingProcessCh <- &pendingResponse{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) IsCaughtUp() bool {
+ bk.mtx.RLock()
+ defer bk.mtx.RUnlock()
+ return bk.chainHeight >= bk.maxPeerHeight
+}
+
+func (bk *blockKeeper) RemovePeer(peerID string) {
+ bk.mtx.Lock()
+ delete(bk.peers, peerID)
+ bk.mtx.Unlock()
+ log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
+}
+
+func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
+ peer := bk.sw.Peers().Get(peerID)
+ if peer == nil {
+ return errors.New("can't find peer in peer pool")
+ }
+ msg := &BlockRequestMessage{RawHash: hash.Byte32()}
+ peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return nil
+}
+
+func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
+ peer := bk.sw.Peers().Get(peerID)
+ if peer == nil {
+ return errors.New("can't find peer in peer pool")
+ }
+ msg := &BlockRequestMessage{Height: height}
+ peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return nil
+}
+
+func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
+ bk.mtx.Lock()
+ defer bk.mtx.Unlock()
+
+ if height > bk.maxPeerHeight {
+ bk.maxPeerHeight = height
+ bk.peerUpdateCh <- struct{}{}
+ }
+
+ if peer, ok := bk.peers[peerID]; ok {
+ peer.SetStatus(height, hash)
+ return
+ }
+ peer := newBlockKeeperPeer(height, hash)
+ bk.peers[peerID] = peer
+ log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
+}
+
+func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
+ bk.mtx.RLock()
+ defer bk.mtx.RUnlock()
+
+ for peerID, peer := range bk.peers {
+ if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
+ bk.requestBlockByHeight(peerID, height)
+ }
+ }
+}
+
+func (bk *blockKeeper) blockRequestWorker() {
+ for {
+ select {
+ case <-bk.chainUpdateCh:
+ chainHeight := bk.chain.Height()
+ bk.mtx.Lock()
+ if bk.chainHeight < chainHeight {
+ bk.chainHeight = chainHeight
+ }
+ bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
+ bk.mtx.Unlock()
+
+ case <-bk.peerUpdateCh:
+ bk.mtx.RLock()
+ chainHeight := bk.chainHeight
+ maxPeerHeight := bk.maxPeerHeight
+ bk.mtx.RUnlock()
+
+ for i := chainHeight + 1; i <= maxPeerHeight; i++ {
+ bk.RequestBlockByHeight(i)
+ waiter := bk.chain.BlockWaiter(i)
+ <-waiter
+ }
+ }
+ }
+}
+
+func (bk *blockKeeper) blockProcessWorker() {
+ for pendingResponse := range bk.pendingProcessCh {
+ block := pendingResponse.block
+ blockHash := block.Hash()
+ isOrphan, err := bk.chain.ProcessBlock(block)
+ if err != nil {
+ log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
+ continue
+ }
+ log.WithFields(log.Fields{
+ "height": block.Height,
+ "hash": blockHash.String(),
+ "isOrphan": isOrphan,
+ }).Info("blockKeeper processed block")
+
+ if isOrphan {
+ bk.requestBlockByHash(pendingResponse.peerID, &block.PreviousBlockHash)
+ }
+ }
+}
+++ /dev/null
-// MemStore is a Store implementation that
-// keeps all blockchain state in memory.
-//
-// It is used in tests to avoid needing a database.
-package blockchain
-
-import (
- "fmt"
- "sync"
-
- "github.com/bytom/protocol/bc/legacy"
- // "github.com/blockchain/protocol/state"
-)
-
-// MemStore satisfies the Store interface.
-type MemStore struct {
- mu sync.Mutex
- Blocks map[uint64]*legacy.Block
- // State *state.Snapshot
- // StateHeight uint64
-}
-
-// New returns a new MemStore
-func NewMemStore() *MemStore {
- return &MemStore{Blocks: make(map[uint64]*legacy.Block)}
-}
-
-func (m *MemStore) Height() uint64 {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- return uint64(len(m.Blocks))
-}
-
-func (m *MemStore) SaveBlock(b *legacy.Block) error {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- existing, ok := m.Blocks[b.Height]
- if ok && existing.Hash() != b.Hash() {
- return fmt.Errorf("already have a block at height %d", b.Height)
- }
- m.Blocks[b.Height] = b
- return nil
-}
-
-/*
-func (m *MemStore) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- m.State = state.Copy(snapshot)
- m.StateHeight = height
- return nil
-}
-*/
-
-func (m *MemStore) LoadBlock(height uint64) *legacy.Block {
- m.mu.Lock()
- defer m.mu.Unlock()
- b, ok := m.Blocks[height]
- if !ok {
- return nil
- }
- return b
-}
-
-/*
-func (m *MemStore) LatestSnapshot(context.Context) (*state.Snapshot, uint64, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- if m.State == nil {
- m.State = state.Empty()
- }
- return state.Copy(m.State), m.StateHeight, nil
-}
-*/
-
-func (m *MemStore) FinalizeBlock(uint64) error { return nil }
package blockchain
import (
- "github.com/bytom/blockchain/rpc"
+ "bytes"
+ "errors"
+ "fmt"
- ctypes "github.com/bytom/blockchain/rpc/types"
+ wire "github.com/tendermint/go-wire"
+
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/legacy"
+)
+
+const (
+ BlockRequestByte = byte(0x10)
+ BlockResponseByte = byte(0x11)
+ StatusRequestByte = byte(0x20)
+ StatusResponseByte = byte(0x21)
+ NewTransactionByte = byte(0x30)
+)
+
+// BlockchainMessage is a generic message for this reactor.
+type BlockchainMessage interface{}
+
+var _ = wire.RegisterInterface(
+ struct{ BlockchainMessage }{},
+ wire.ConcreteType{&BlockRequestMessage{}, BlockRequestByte},
+ wire.ConcreteType{&BlockResponseMessage{}, BlockResponseByte},
+ wire.ConcreteType{&StatusRequestMessage{}, StatusRequestByte},
+ wire.ConcreteType{&StatusResponseMessage{}, StatusResponseByte},
+ wire.ConcreteType{&TransactionNotifyMessage{}, NewTransactionByte},
)
-func (a *BlockchainReactor) getNetInfo() (*ctypes.ResultNetInfo, error) {
- return rpc.NetInfo(a.sw)
+func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
+ msgType = bz[0]
+ n := int(0)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
+ if err != nil && n != len(bz) {
+ err = errors.New("DecodeMessage() had bytes left over")
+ }
+ return
+}
+
+type BlockRequestMessage struct {
+ Height uint64
+ RawHash [32]byte
+}
+
+func (m *BlockRequestMessage) GetHash() *bc.Hash {
+ hash := bc.NewHash(m.RawHash)
+ return &hash
+}
+
+func (m *BlockRequestMessage) String() string {
+ if m.Height > 0 {
+ return fmt.Sprintf("BlockRequestMessage{Height: %d}", m.Height)
+ }
+ hash := m.GetHash()
+ return fmt.Sprintf("BlockRequestMessage{Hash: %s}", hash.String())
+}
+
+type BlockResponseMessage struct {
+ RawBlock []byte
+}
+
+func NewBlockResponseMessage(block *legacy.Block) (*BlockResponseMessage, error) {
+ rawBlock, err := block.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+ return &BlockResponseMessage{RawBlock: rawBlock}, nil
+}
+
+func (m *BlockResponseMessage) GetBlock() *legacy.Block {
+ block := &legacy.Block{
+ BlockHeader: legacy.BlockHeader{},
+ Transactions: []*legacy.Tx{},
+ }
+ block.UnmarshalText(m.RawBlock)
+ return block
+}
+
+func (m *BlockResponseMessage) String() string {
+ return fmt.Sprintf("BlockResponseMessage{Size: %d}", len(m.RawBlock))
+}
+
+type TransactionNotifyMessage struct {
+ RawTx []byte
+}
+
+func NewTransactionNotifyMessage(tx *legacy.Tx) (*TransactionNotifyMessage, error) {
+ rawTx, err := tx.TxData.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+ return &TransactionNotifyMessage{RawTx: rawTx}, nil
+}
+
+func (m *TransactionNotifyMessage) GetTransaction() *legacy.Tx {
+ tx := &legacy.Tx{}
+ tx.UnmarshalText(m.RawTx)
+ return tx
+}
+
+func (m *TransactionNotifyMessage) String() string {
+ return fmt.Sprintf("TransactionNotifyMessage{Size: %d}", len(m.RawTx))
+}
+
+type StatusRequestMessage struct{}
+
+func (m *StatusRequestMessage) String() string {
+ return "StatusRequestMessage"
+}
+
+type StatusResponseMessage struct {
+ Height uint64
+ RawHash [32]byte
+}
+
+func NewStatusResponseMessage(block *legacy.Block) *StatusResponseMessage {
+ return &StatusResponseMessage{
+ Height: block.Height,
+ RawHash: block.Hash().Byte32(),
+ }
+}
+
+func (m *StatusResponseMessage) GetHash() *bc.Hash {
+ hash := bc.NewHash(m.RawHash)
+ return &hash
+}
+
+func (m *StatusResponseMessage) String() string {
+ hash := m.GetHash()
+ return fmt.Sprintf("StatusResponseMessage{Height: %d, Hash: %s}", m.Height, hash.String())
}
func (p *pin) processBlock(ctx context.Context, c *protocol.Chain, height uint64, cb func(context.Context, *legacy.Block) error) {
defer func() { <-p.sem }()
for {
- block, err := c.GetBlock(height)
+ block, err := c.GetBlockByHeight(height)
if err != nil {
log.WithField("error", err).Error("Process block")
continue
+++ /dev/null
-package blockchain
-
-import (
- "math"
- "sync"
- "time"
-
- // "github.com/blockchain/types"
- "github.com/bytom/protocol/bc/legacy"
- log "github.com/sirupsen/logrus"
- . "github.com/tendermint/tmlibs/common"
- flow "github.com/tendermint/tmlibs/flowrate"
-)
-
-const (
- requestIntervalMS = 250
- maxTotalRequesters = 300
- maxPendingRequests = maxTotalRequesters
- maxPendingRequestsPerPeer = 75
- minRecvRate = 10240 // 10Kb/s
-)
-
-var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests
-
-type BlockPool struct {
- BaseService
- startTime time.Time
-
- mtx sync.Mutex
- // block requests
- requesters map[uint64]*bpRequester
- height uint64 // the lowest key in requesters.
- numPending int32 // number of requests pending assignment or block response
- // peers
- peers map[string]*bpPeer
-
- requestsCh chan<- BlockRequest
- timeoutsCh chan<- string
-}
-
-func NewBlockPool(start uint64, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
- bp := &BlockPool{
- peers: make(map[string]*bpPeer),
-
- requesters: make(map[uint64]*bpRequester),
- height: start,
- numPending: 0,
-
- requestsCh: requestsCh,
- timeoutsCh: timeoutsCh,
- }
- bp.BaseService = *NewBaseService(nil, "BlockPool", bp)
- return bp
-}
-
-func (pool *BlockPool) OnStart() error {
- go pool.makeRequestersRoutine()
- pool.startTime = time.Now()
- return nil
-}
-
-func (pool *BlockPool) OnStop() {
- pool.BaseService.OnStop()
-}
-
-// Run spawns requesters as needed.
-func (pool *BlockPool) makeRequestersRoutine() {
- for {
- if !pool.IsRunning() {
- break
- }
- _, numPending, lenRequesters := pool.GetStatus()
- if numPending >= maxPendingRequests {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- // check for timed out peers
- pool.removeTimedoutPeers()
- } else if lenRequesters >= maxTotalRequesters {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- // check for timed out peers
- pool.removeTimedoutPeers()
- } else {
- // request for more blocks.
- pool.makeNextRequester()
- }
- }
-}
-
-func (pool *BlockPool) removeTimedoutPeers() {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- for _, peer := range pool.peers {
- if !peer.didTimeout && peer.numPending > 0 {
- curRate := peer.recvMonitor.Status().CurRate
- // XXX remove curRate != 0
- if curRate != 0 && curRate < minRecvRate {
- pool.sendTimeout(peer.id)
- log.WithFields(log.Fields{
- "peer": peer.id,
- "reason": "curRate too low",
- }).Error("SendTimeout")
- peer.didTimeout = true
- }
- }
- if peer.didTimeout {
- pool.removePeer(peer.id)
- }
- }
-}
-
-func (pool *BlockPool) GetStatus() (height uint64, numPending int32, lenRequesters int) {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- return pool.height, pool.numPending, len(pool.requesters)
-}
-
-// TODO: relax conditions, prevent abuse.
-func (pool *BlockPool) IsCaughtUp() bool {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- height := pool.height
-
- // Need at least 1 peer to be considered caught up.
- if len(pool.peers) == 0 && time.Now().Sub(pool.startTime) > 60*time.Second {
- log.Debug("Blockpool has no peers")
- return false
- }
-
- var maxPeerHeight uint64 = 0
- for _, peer := range pool.peers {
- maxPeerHeight = MaxUint64(maxPeerHeight, peer.height)
- }
-
- isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
- log.WithFields(log.Fields{
- "height": height,
- "maxPeerHeight": maxPeerHeight,
- }).Infof("IsCaughtUp: %v", isCaughtUp)
- return isCaughtUp
-}
-
-// We need to see the second block's Commit to validate the first block.
-// So we peek two blocks at a time.
-// The caller will verify the commit.
-func (pool *BlockPool) PeekTwoBlocks() (first *legacy.Block, second *legacy.Block) {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- if r := pool.requesters[pool.height]; r != nil {
- first = r.getBlock()
- }
- if r := pool.requesters[pool.height+1]; r != nil {
- second = r.getBlock()
- }
- return
-}
-
-// Pop the first block at pool.height
-// It must have been validated by 'second'.Commit from PeekTwoBlocks().
-func (pool *BlockPool) PopRequest() {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- if r := pool.requesters[pool.height]; r != nil {
- /* The block can disappear at any time, due to removePeer().
- if r := pool.requesters[pool.height]; r == nil || r.block == nil {
- PanicSanity("PopRequest() requires a valid block")
- }
- */
- r.Stop()
- delete(pool.requesters, pool.height)
- pool.height++
- } else {
- PanicSanity(Fmt("Expected requester to pop, got nothing at height %v", pool.height))
- }
-}
-
-// Invalidates the block at pool.height,
-// Remove the peer and redo request from others.
-func (pool *BlockPool) RedoRequest(height uint64) {
- pool.mtx.Lock()
- request := pool.requesters[height]
- pool.mtx.Unlock()
-
- if request.block == nil {
- PanicSanity("Expected block to be non-nil")
- }
- // RemovePeer will redo all requesters associated with this peer.
- // TODO: record this malfeasance
- pool.RemovePeer(request.peerID)
-}
-
-// TODO: ensure that blocks come in order for each peer.
-func (pool *BlockPool) AddBlock(peerID string, block *legacy.Block, blockSize int) {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- requester := pool.requesters[block.Height]
- if requester == nil {
- return
- }
-
- if requester.setBlock(block, peerID) {
- pool.numPending--
- peer := pool.peers[peerID]
- peer.decrPending(blockSize)
- } else {
- // Bad peer?
- }
-}
-
-// Sets the peer's alleged blockchain height.
-func (pool *BlockPool) SetPeerHeight(peerID string, height uint64) {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- peer := pool.peers[peerID]
- if peer != nil {
- peer.height = height
- } else {
- peer = newBPPeer(pool, peerID, height)
- pool.peers[peerID] = peer
- }
-}
-
-func (pool *BlockPool) RemovePeer(peerID string) {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- pool.removePeer(peerID)
-}
-
-func (pool *BlockPool) removePeer(peerID string) {
- for _, requester := range pool.requesters {
- if requester.getPeerID() == peerID {
- if requester.getBlock() != nil {
- pool.numPending++
- }
- go requester.redo() // pick another peer and ...
- }
- }
- delete(pool.peers, peerID)
-}
-
-// Pick an available peer with at least the given minHeight.
-// If no peers are available, returns nil.
-func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint64) *bpPeer {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- for _, peer := range pool.peers {
- if peer.didTimeout {
- pool.removePeer(peer.id)
- continue
- } else {
- }
- if peer.numPending >= maxPendingRequestsPerPeer {
- continue
- }
- if peer.height < minHeight {
- continue
- }
- peer.incrPending()
- return peer
- }
- return nil
-}
-
-func (pool *BlockPool) makeNextRequester() {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
-
- nextHeight := pool.height + uint64(len(pool.requesters))
- request := newBPRequester(pool, nextHeight)
- request.SetLogger(pool.Logger.With("height", nextHeight))
-
- pool.requesters[nextHeight] = request
- pool.numPending++
-
- request.Start()
-}
-
-func (pool *BlockPool) sendRequest(height uint64, peerID string) {
- if !pool.IsRunning() {
- return
- }
- pool.requestsCh <- BlockRequest{height, peerID}
-}
-
-func (pool *BlockPool) sendTimeout(peerID string) {
- if !pool.IsRunning() {
- return
- }
- pool.timeoutsCh <- peerID
-}
-
-func (pool *BlockPool) debug() string {
- pool.mtx.Lock() // Lock
- defer pool.mtx.Unlock()
-
- str := ""
- var h uint64
- for h = pool.height; h < pool.height+uint64(len(pool.requesters)); h++ {
- if pool.requesters[h] == nil {
- str += Fmt("H(%v):X ", h)
- } else {
- str += Fmt("H(%v):", h)
- str += Fmt("B?(%v) ", pool.requesters[h].block != nil)
- }
- }
- return str
-}
-
-//-------------------------------------
-
-type bpPeer struct {
- pool *BlockPool
- id string
- recvMonitor *flow.Monitor
-
- mtx sync.Mutex
- height uint64
- numPending int32
- timeout *time.Timer
- didTimeout bool
-}
-
-func newBPPeer(pool *BlockPool, peerID string, height uint64) *bpPeer {
- peer := &bpPeer{
- pool: pool,
- id: peerID,
- height: height,
- numPending: 0,
- }
- return peer
-}
-
-func (peer *bpPeer) resetMonitor() {
- peer.recvMonitor = flow.New(time.Second, time.Second*40)
- var initialValue = float64(minRecvRate) * math.E
- peer.recvMonitor.SetREMA(initialValue)
-}
-
-func (peer *bpPeer) resetTimeout() {
- if peer.timeout == nil {
- peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
- } else {
- peer.timeout.Reset(time.Second * peerTimeoutSeconds)
- }
-}
-
-func (peer *bpPeer) incrPending() {
- if peer.numPending == 0 {
- peer.resetMonitor()
- peer.resetTimeout()
- }
- peer.numPending++
-}
-
-func (peer *bpPeer) decrPending(recvSize int) {
- peer.numPending--
- if peer.numPending == 0 {
- peer.timeout.Stop()
- } else {
- peer.recvMonitor.Update(recvSize)
- peer.resetTimeout()
- }
-}
-
-func (peer *bpPeer) onTimeout() {
- peer.pool.mtx.Lock()
- defer peer.pool.mtx.Unlock()
-
- peer.pool.sendTimeout(peer.id)
- log.WithField("error", "onTimeout").Error("SendTimeout")
- peer.didTimeout = true
-}
-
-//-------------------------------------
-
-type bpRequester struct {
- BaseService
- pool *BlockPool
- height uint64
- gotBlockCh chan struct{}
- redoCh chan struct{}
-
- mtx sync.Mutex
- peerID string
- block *legacy.Block
-}
-
-func newBPRequester(pool *BlockPool, height uint64) *bpRequester {
- bpr := &bpRequester{
- pool: pool,
- height: height,
- gotBlockCh: make(chan struct{}),
- redoCh: make(chan struct{}),
-
- peerID: "",
- block: nil,
- }
- bpr.BaseService = *NewBaseService(nil, "bpRequester", bpr)
- return bpr
-}
-
-func (bpr *bpRequester) OnStart() error {
- go bpr.requestRoutine()
- return nil
-}
-
-// Returns true if the peer matches
-func (bpr *bpRequester) setBlock(block *legacy.Block, peerID string) bool {
- bpr.mtx.Lock()
- if bpr.block != nil || bpr.peerID != peerID {
- bpr.mtx.Unlock()
- return false
- }
- bpr.block = block
- bpr.mtx.Unlock()
-
- bpr.gotBlockCh <- struct{}{}
- return true
-}
-
-func (bpr *bpRequester) getBlock() *legacy.Block {
- bpr.mtx.Lock()
- defer bpr.mtx.Unlock()
- return bpr.block
-}
-
-func (bpr *bpRequester) getPeerID() string {
- bpr.mtx.Lock()
- defer bpr.mtx.Unlock()
- return bpr.peerID
-}
-
-func (bpr *bpRequester) reset() {
- bpr.mtx.Lock()
- bpr.peerID = ""
- bpr.block = nil
- bpr.mtx.Unlock()
-}
-
-// Tells bpRequester to pick another peer and try again.
-// NOTE: blocking
-func (bpr *bpRequester) redo() {
- bpr.redoCh <- struct{}{}
-}
-
-// Responsible for making more requests as necessary
-// Returns only when a block is found (e.g. AddBlock() is called)
-func (bpr *bpRequester) requestRoutine() {
-OUTER_LOOP:
- for {
- // Pick a peer to send request to.
- var peer *bpPeer = nil
- PICK_PEER_LOOP:
- for {
- if !bpr.IsRunning() || !bpr.pool.IsRunning() {
- return
- }
- peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
- if peer == nil {
- time.Sleep(requestIntervalMS * time.Millisecond)
- continue PICK_PEER_LOOP
- }
- break PICK_PEER_LOOP
- }
- bpr.mtx.Lock()
- bpr.peerID = peer.id
- bpr.mtx.Unlock()
-
- // Send request and wait.
- bpr.pool.sendRequest(bpr.height, peer.id)
- select {
- case <-bpr.pool.Quit:
- bpr.Stop()
- return
- case <-bpr.Quit:
- return
- case <-bpr.redoCh:
- bpr.reset()
- continue OUTER_LOOP // When peer is removed
- case <-bpr.gotBlockCh:
- // We got the block, now see if it's good.
- select {
- case <-bpr.pool.Quit:
- bpr.Stop()
- return
- case <-bpr.Quit:
- return
- case <-bpr.redoCh:
- bpr.reset()
- continue OUTER_LOOP
- }
- }
- }
-}
-
-//-------------------------------------
-
-type BlockRequest struct {
- Height uint64
- PeerID string
-}
+++ /dev/null
-package blockchain
-
-/*
-import (
- "math/rand"
- "testing"
- "time"
-
- "github.com/bytom/types"
- . "github.com/tendermint/tmlibs/common"
- "github.com/tendermint/tmlibs/log"
-)
-
-func init() {
- peerTimeoutSeconds = time.Duration(2)
-}
-
-type testPeer struct {
- id string
- height int
-}
-
-func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer {
- peers := make(map[string]testPeer, numPeers)
- for i := 0; i < numPeers; i++ {
- peerID := RandStr(12)
- height := minHeight + rand.Intn(maxHeight-minHeight)
- peers[peerID] = testPeer{peerID, height}
- }
- return peers
-}
-
-func TestBasic(t *testing.T) {
- start := 42
- peers := makePeers(10, start+1, 1000)
- timeoutsCh := make(chan string, 100)
- requestsCh := make(chan BlockRequest, 100)
- pool := NewBlockPool(start, requestsCh, timeoutsCh)
- pool.SetLogger(log.TestingLogger())
- pool.Start()
- defer pool.Stop()
-
- // Introduce each peer.
- go func() {
- for _, peer := range peers {
- pool.SetPeerHeight(peer.id, peer.height)
- }
- }()
-
- // Start a goroutine to pull blocks
- go func() {
- for {
- if !pool.IsRunning() {
- return
- }
- first, second := pool.PeekTwoBlocks()
- if first != nil && second != nil {
- pool.PopRequest()
- } else {
- time.Sleep(1 * time.Second)
- }
- }
- }()
-
- // Pull from channels
- for {
- select {
- case peerID := <-timeoutsCh:
- t.Errorf("timeout: %v", peerID)
- case request := <-requestsCh:
- t.Logf("Pulled new BlockRequest %v", request)
- if request.Height == 300 {
- return // Done!
- }
- // Request desired, pretend like we got the block immediately.
- go func() {
- block := &types.Block{Header: &types.Header{Height: request.Height}}
- pool.AddBlock(request.PeerID, block, 123)
- t.Logf("Added block from peer %v (height: %v)", request.PeerID, request.Height)
- }()
- }
- }
-}
-
-func TestTimeout(t *testing.T) {
- start := 42
- peers := makePeers(10, start+1, 1000)
- timeoutsCh := make(chan string, 100)
- requestsCh := make(chan BlockRequest, 100)
- pool := NewBlockPool(start, requestsCh, timeoutsCh)
- pool.SetLogger(log.TestingLogger())
- pool.Start()
- defer pool.Stop()
-
- for _, peer := range peers {
- t.Logf("Peer %v", peer.id)
- }
-
- // Introduce each peer.
- go func() {
- for _, peer := range peers {
- pool.SetPeerHeight(peer.id, peer.height)
- }
- }()
-
- // Start a goroutine to pull blocks
- go func() {
- for {
- if !pool.IsRunning() {
- return
- }
- first, second := pool.PeekTwoBlocks()
- if first != nil && second != nil {
- pool.PopRequest()
- } else {
- time.Sleep(1 * time.Second)
- }
- }
- }()
-
- // Pull from channels
- counter := 0
- timedOut := map[string]struct{}{}
- for {
- select {
- case peerID := <-timeoutsCh:
- t.Logf("Peer %v timeouted", peerID)
- if _, ok := timedOut[peerID]; !ok {
- counter++
- if counter == len(peers) {
- return // Done!
- }
- }
- case request := <-requestsCh:
- t.Logf("Pulled new BlockRequest %+v", request)
- }
- }
-}
-*/
package blockchain
import (
- "bytes"
+ "blockchain/blockchain/rpc"
"context"
"fmt"
"net/http"
"reflect"
"time"
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
+
"github.com/bytom/blockchain/accesstoken"
"github.com/bytom/blockchain/account"
"github.com/bytom/blockchain/asset"
+ "github.com/bytom/blockchain/pin"
"github.com/bytom/blockchain/pseudohsm"
- "github.com/bytom/blockchain/txdb"
+ ctypes "github.com/bytom/blockchain/rpc/types"
"github.com/bytom/blockchain/txfeed"
"github.com/bytom/encoding/json"
+ "github.com/bytom/errors"
"github.com/bytom/mining/cpuminer"
"github.com/bytom/net/http/httpjson"
"github.com/bytom/p2p"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc/legacy"
"github.com/bytom/types"
- wire "github.com/tendermint/go-wire"
- cmn "github.com/tendermint/tmlibs/common"
-
- "github.com/bytom/blockchain/pin"
- "github.com/bytom/errors"
- log "github.com/sirupsen/logrus"
)
const (
- // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
+ // BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = byte(0x40)
- defaultChannelCapacity = 100
- defaultSleepIntervalMS = 500
- trySyncIntervalMS = 100
- // stop syncing when last block's time is
- // within this much of the system time.
- // stopSyncingDurationMinutes = 10
-
- // ask for best height every 10s
+ defaultChannelCapacity = 100
+ trySyncIntervalMS = 100
statusUpdateIntervalSeconds = 10
- // check if we should switch to consensus reactor
- switchToConsensusIntervalSeconds = 1
- maxBlockchainResponseSize = 22020096 + 2
- crosscoreRPCPrefix = "/rpc/"
+ maxBlockchainResponseSize = 22020096 + 2
+ crosscoreRPCPrefix = "/rpc/"
)
// BlockchainReactor handles long-term catchup syncing.
p2p.BaseReactor
chain *protocol.Chain
- store *txdb.Store
pinStore *pin.Store
accounts *account.Manager
assets *asset.Registry
accesstoken *accesstoken.Token
txFeeds *txfeed.TxFeed
- pool *BlockPool
+ blockKeeper *blockKeeper
txPool *protocol.TxPool
hsm *pseudohsm.HSM
mining *cpuminer.CPUMiner
mux *http.ServeMux
sw *p2p.Switch
handler http.Handler
- fastSync bool
- requestsCh chan BlockRequest
- timeoutsCh chan string
evsw types.EventSwitch
}
}
func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
- //if a.config == nil {
- // never configured
return map[string]interface{}{
"is_configured": false,
"version": "0.001",
"build_date": "------",
"build_config": "---------",
}, nil
- //}
-}
-
-func (bcr *BlockchainReactor) createblockkey(ctx context.Context) {
}
func maxBytes(h http.Handler) http.Handler {
m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs))
m.Handle("/", alwaysError(errors.New("not Found")))
m.Handle("/info", jsonHandler(bcr.info))
- m.Handle("/create-block-key", jsonHandler(bcr.createblockkey))
m.Handle("/submit-transaction", jsonHandler(bcr.submit))
m.Handle("/create-access-token", jsonHandler(bcr.createAccessToken))
m.Handle("/list-access-tokens", jsonHandler(bcr.listAccessTokens))
LastPage bool `json:"last_page"`
}
-func NewBlockchainReactor(store *txdb.Store,
- chain *protocol.Chain,
- txPool *protocol.TxPool,
- accounts *account.Manager,
- assets *asset.Registry,
- sw *p2p.Switch,
- hsm *pseudohsm.HSM,
- fastSync bool,
- pinStore *pin.Store) *BlockchainReactor {
- requestsCh := make(chan BlockRequest, defaultChannelCapacity)
- timeoutsCh := make(chan string, defaultChannelCapacity)
- pool := NewBlockPool(
- store.Height()+1,
- requestsCh,
- timeoutsCh,
- )
+func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store) *BlockchainReactor {
mining := cpuminer.NewCPUMiner(chain, txPool)
bcR := &BlockchainReactor{
- chain: chain,
- store: store,
- pinStore: pinStore,
- accounts: accounts,
- assets: assets,
- pool: pool,
- txPool: txPool,
- mining: mining,
- mux: http.NewServeMux(),
- sw: sw,
- hsm: hsm,
- fastSync: fastSync,
- requestsCh: requestsCh,
- timeoutsCh: timeoutsCh,
+ chain: chain,
+ pinStore: pinStore,
+ accounts: accounts,
+ assets: assets,
+ blockKeeper: newBlockKeeper(chain, sw),
+ txPool: txPool,
+ mining: mining,
+ mux: http.NewServeMux(),
+ sw: sw,
+ hsm: hsm,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
func (bcR *BlockchainReactor) OnStart() error {
bcR.BaseReactor.OnStart()
bcR.BuildHander()
- if bcR.fastSync {
- _, err := bcR.pool.Start()
- if err != nil {
- return err
- }
- go bcR.poolRoutine()
- }
+ bcR.mining.Start()
+ go bcR.syncRoutine()
return nil
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
- if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}}) {
- // doing nothing, will try later in `poolRoutine`
- }
+ peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- bcR.pool.RemovePeer(peer.Key)
+ bcR.blockKeeper.RemovePeer(peer.Key)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
- bcR.Logger.Error("Error decoding message", "error", err)
+ log.Errorf("Error decoding messagek %v", err)
return
}
-
- bcR.Logger.Info("Receive", "src", src, "chID", chID, "msg", msg)
+ log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
switch msg := msg.(type) {
- case *bcBlockRequestMessage:
- rawBlock, err := bcR.store.GetRawBlock(msg.Height)
- if err == nil {
- msg := &bcBlockResponseMessage{RawBlock: rawBlock}
- queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- if !queued {
- // queue is full, just ignore.
- }
+ case *BlockRequestMessage:
+ var block *legacy.Block
+ var err error
+ if msg.Height != 0 {
+ block, err = bcR.chain.GetBlockByHeight(msg.Height)
} else {
- bcR.Logger.Info("skip sent the block response due to block is nil")
- // TODO peer is asking for things we don't have.
+ block, err = bcR.chain.GetBlockByHash(msg.GetHash())
}
- case *bcBlockResponseMessage:
- // Got a block.
- bcR.pool.AddBlock(src.Key, msg.GetBlock(), len(msgBytes))
- case *bcStatusRequestMessage:
- // Send peer our state.
- queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}})
- if !queued {
- // sorry
+ if err != nil {
+ log.Errorf("Fail on BlockRequestMessage get block: %v", err)
+ return
}
- case *bcStatusResponseMessage:
- // Got a peer status. Unverified.
- bcR.pool.SetPeerHeight(src.Key, msg.Height)
- case *bcTransactionMessage:
- tx := msg.GetTransaction()
+ response, err := NewBlockResponseMessage(block)
+ if err != nil {
+ log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
+ return
+ }
+ src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
+
+ case *BlockResponseMessage:
+ bcR.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+
+ case *StatusRequestMessage:
+ block, _ := bcR.chain.State()
+ src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
+
+ case *StatusResponseMessage:
+ bcR.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
+
+ case *TransactionNotifyMessage:
+ tx := msg.GetTransaction()
if err := bcR.chain.ValidateTx(tx); err != nil {
- bcR.Logger.Error("fail to sync transaction to txPool", "err", err)
+ log.Errorf("TransactionNotifyMessage: %v", err)
}
+
default:
- bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
+ log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
-func (bcR *BlockchainReactor) poolRoutine() {
-
- trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
+func (bcR *BlockchainReactor) syncRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
newTxCh := bcR.txPool.GetNewTxCh()
- //switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
-FOR_LOOP:
for {
-
select {
- case request := <-bcR.requestsCh: // chan BlockRequest
- peer := bcR.Switch.Peers().Get(request.PeerID)
- if peer == nil {
- continue FOR_LOOP // Peer has since been disconnected.
- }
- msg := &bcBlockRequestMessage{request.Height}
- queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- if !queued {
- // We couldn't make the request, send-queue full.
- // The pool handles timeouts, just let it go.
- continue FOR_LOOP
- }
- case peerID := <-bcR.timeoutsCh: // chan string
- // Peer timed out.
- peer := bcR.Switch.Peers().Get(peerID)
- if peer != nil {
- bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
- }
case newTx := <-newTxCh:
go bcR.BroadcastTransaction(newTx)
case _ = <-statusUpdateTicker.C:
- // ask for status updates
- go bcR.BroadcastStatusRequest()
- case _ = <-trySyncTicker.C: // chan time
- SYNC_LOOP:
- for i := 0; i < 10; i++ {
- // See if there are any blocks to sync.
- block, _ := bcR.pool.PeekTwoBlocks()
- if block == nil {
- break SYNC_LOOP
- }
- bcR.pool.PopRequest()
-
- if err := bcR.chain.AddBlock(nil, block); err == nil {
- bcR.Logger.Info("finish to sync commit block", "blockHeigh", block.BlockHeader.Height)
- } else {
- bcR.Logger.Info("fail to sync commit block", "blockHeigh", block.BlockHeader.Height, "error", err)
- }
- }
- continue FOR_LOOP
+ go bcR.BroadcastStatusResponse()
case <-bcR.Quit:
- break FOR_LOOP
- }
- if bcR.pool.IsCaughtUp() && !bcR.mining.IsMining() {
- bcR.Logger.Info("start to mining")
- bcR.mining.Start()
+ return
}
}
}
+func (bcR *BlockchainReactor) getNetInfo() (*ctypes.ResultNetInfo, error) {
+ return rpc.NetInfo(bcR.sw)
+}
+
// BroadcastStatusRequest broadcasts `BlockStore` height.
-func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
- bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.chain.Height()}})
- return nil
+func (bcR *BlockchainReactor) BroadcastStatusResponse() {
+ block, _ := bcR.chain.State()
+ bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
}
func (bcR *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error {
- rawTx, err := tx.TxData.MarshalText()
+ msg, err := NewTransactionNotifyMessage(tx)
if err != nil {
return err
}
- bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcTransactionMessage{rawTx}})
+ bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
return nil
}
-
-//-----------------------------------------------------------------------------
-// Messages
-
-const (
- msgTypeBlockRequest = byte(0x10)
- msgTypeBlockResponse = byte(0x11)
- msgTypeStatusResponse = byte(0x20)
- msgTypeStatusRequest = byte(0x21)
- msgTypeTransactionRequest = byte(0x30)
-)
-
-// BlockchainMessage is a generic message for this reactor.
-type BlockchainMessage interface{}
-
-var _ = wire.RegisterInterface(
- struct{ BlockchainMessage }{},
- wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
- wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
- wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
- wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
- wire.ConcreteType{&bcTransactionMessage{}, msgTypeTransactionRequest},
-)
-
-// DecodeMessage decodes BlockchainMessage.
-// TODO: ensure that bz is completely read.
-func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
- msgType = bz[0]
- n := int(0)
- r := bytes.NewReader(bz)
- msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
- if err != nil && n != len(bz) {
- err = errors.New("DecodeMessage() had bytes left over")
- }
- return
-}
-
-//-----------------------------------
-
-type bcBlockRequestMessage struct {
- Height uint64
-}
-
-func (m *bcBlockRequestMessage) String() string {
- return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
-}
-
-//-------------------------------------
-
-type bcTransactionMessage struct {
- RawTx []byte
-}
-
-func (m *bcTransactionMessage) GetTransaction() *legacy.Tx {
- tx := &legacy.Tx{}
- tx.UnmarshalText(m.RawTx)
- return tx
-}
-
-//-------------------------------------
-
-// NOTE: keep up-to-date with maxBlockchainResponseSize
-type bcBlockResponseMessage struct {
- RawBlock []byte
-}
-
-func (m *bcBlockResponseMessage) GetBlock() *legacy.Block {
- block := &legacy.Block{
- BlockHeader: legacy.BlockHeader{},
- Transactions: []*legacy.Tx{},
- }
- block.UnmarshalText(m.RawBlock)
- return block
-}
-
-func (m *bcBlockResponseMessage) String() string {
- block := m.GetBlock()
- return cmn.Fmt("[bcBlockResponseMessage %v]", block.BlockHeader.Height)
-}
-
-//-------------------------------------
-
-type bcStatusRequestMessage struct {
- Height uint64
-}
-
-func (m *bcStatusRequestMessage) String() string {
- return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
-}
-
-//-------------------------------------
-
-type bcStatusResponseMessage struct {
- Height uint64
-}
-
-func (m *bcStatusResponseMessage) String() string {
- return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
-}
return nil, errors.Wrapf(err, "waiting for block at height %d", height)
}
- rawBlock, err := a.store.GetRawBlock(height)
+ block, err := a.chain.GetBlockByHeight(height)
if err != nil {
return nil, err
}
+ rawBlock, err := block.MarshalText()
+ if err != nil {
+
+ return nil, err
+ }
return rawBlock, nil
}
package rpc
-import (
+/*import (
"github.com/bytom/blockchain/txdb"
ctypes "github.com/bytom/blockchain/rpc/types"
func BlockHeight(blockStore *txdb.Store) (*ctypes.ResultBlockchainInfo, error) {
return &ctypes.ResultBlockchainInfo{LastHeight: blockStore.Height()}, nil
-}
+}*/
return 0, ctx.Err()
case <-a.chain.BlockWaiter(height):
- b, err := a.chain.GetBlock(height)
+ b, err := a.chain.GetBlockByHeight(height)
if err != nil {
return 0, errors.Wrap(err, "getting block that just landed")
}
import (
"errors"
"fmt"
- "strconv"
"sync"
- "github.com/bytom/protocol/bc/legacy"
-
"github.com/golang/groupcache/lru"
"github.com/golang/groupcache/singleflight"
+
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/legacy"
)
const maxCachedBlocks = 30
-func newBlockCache(fillFn func(height uint64) *legacy.Block) blockCache {
+func newBlockCache(fillFn func(hash *bc.Hash) *legacy.Block) blockCache {
return blockCache{
lru: lru.New(maxCachedBlocks),
fillFn: fillFn,
}
type blockCache struct {
- mu sync.Mutex
- lru *lru.Cache
-
- fillFn func(height uint64) *legacy.Block
-
- single singleflight.Group // for cache misses
+ mu sync.Mutex
+ lru *lru.Cache
+ fillFn func(hash *bc.Hash) *legacy.Block
+ single singleflight.Group
}
-func (c *blockCache) lookup(height uint64) (*legacy.Block, error) {
- b, ok := c.get(height)
- if ok {
+func (c *blockCache) lookup(hash *bc.Hash) (*legacy.Block, error) {
+ if b, ok := c.get(hash); ok {
return b, nil
}
- // Cache miss; fill the block
- heightStr := strconv.FormatUint(height, 16)
- block, err := c.single.Do(heightStr, func() (interface{}, error) {
- b := c.fillFn(height)
+ block, err := c.single.Do(hash.String(), func() (interface{}, error) {
+ b := c.fillFn(hash)
if b == nil {
- return nil, errors.New(fmt.Sprintf("There are no block with block height is %v", height))
+ return nil, errors.New(fmt.Sprintf("There are no block with given hash %s", hash.String()))
}
c.add(b)
return block.(*legacy.Block), nil
}
-func (c *blockCache) get(height uint64) (*legacy.Block, bool) {
+func (c *blockCache) get(hash *bc.Hash) (*legacy.Block, bool) {
c.mu.Lock()
- block, ok := c.lru.Get(height)
+ block, ok := c.lru.Get(hash)
c.mu.Unlock()
if block == nil {
return nil, ok
func (c *blockCache) add(block *legacy.Block) {
c.mu.Lock()
- c.lru.Add(block.Height, block)
+ c.lru.Add(block.Hash(), block)
c.mu.Unlock()
}
+++ /dev/null
-If you edit `snapshot.proto` (which specifies the serialization format for `state.Snapshot` objects) you will have to regenerate `snapshot.pb.go` using [protoc](https://github.com/google/protobuf#protocol-compiler-installation):
-
-`protoc --go_out=. snapshot.proto`
-
-You will also need [the `protoc` plugin for generating Go code](https://github.com/golang/protobuf/tree/master/protoc-gen-go).
+++ /dev/null
-// Code generated by protoc-gen-go.
-// source: snapshot.proto
-// DO NOT EDIT!
-
-/*
-Package storage is a generated protocol buffer package.
-
-It is generated from these files:
- snapshot.proto
-
-It has these top-level messages:
- Snapshot
-*/
-package storage
-
-import proto "github.com/golang/protobuf/proto"
-import fmt "fmt"
-import math "math"
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = fmt.Errorf
-var _ = math.Inf
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the proto package it is being compiled against.
-// A compilation error at this line likely means your copy of the
-// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
-
-// Snapshot represents a snapshot of the blockchain, including the state
-// tree and issuance memory.
-type Snapshot struct {
- // Nodes contains every node within the state tree, including interior nodes.
- // The nodes are ordered according to a pre-order traversal.
- Nodes []*Snapshot_StateTreeNode `protobuf:"bytes,1,rep,name=nodes" json:"nodes,omitempty"`
- // Nonces contains the record of recent nonces for ensuring
- // uniqueness of issuances.
- Nonces []*Snapshot_Nonce `protobuf:"bytes,2,rep,name=nonces" json:"nonces,omitempty"`
-}
-
-func (m *Snapshot) Reset() { *m = Snapshot{} }
-func (m *Snapshot) String() string { return proto.CompactTextString(m) }
-func (*Snapshot) ProtoMessage() {}
-func (*Snapshot) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
-
-func (m *Snapshot) GetNodes() []*Snapshot_StateTreeNode {
- if m != nil {
- return m.Nodes
- }
- return nil
-}
-
-func (m *Snapshot) GetNonces() []*Snapshot_Nonce {
- if m != nil {
- return m.Nonces
- }
- return nil
-}
-
-type Snapshot_Nonce struct {
- Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
- ExpiryMs uint64 `protobuf:"varint,2,opt,name=expiry_ms,json=expiryMs" json:"expiry_ms,omitempty"`
-}
-
-func (m *Snapshot_Nonce) Reset() { *m = Snapshot_Nonce{} }
-func (m *Snapshot_Nonce) String() string { return proto.CompactTextString(m) }
-func (*Snapshot_Nonce) ProtoMessage() {}
-func (*Snapshot_Nonce) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
-
-type Snapshot_StateTreeNode struct {
- Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
-}
-
-func (m *Snapshot_StateTreeNode) Reset() { *m = Snapshot_StateTreeNode{} }
-func (m *Snapshot_StateTreeNode) String() string { return proto.CompactTextString(m) }
-func (*Snapshot_StateTreeNode) ProtoMessage() {}
-func (*Snapshot_StateTreeNode) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 1} }
-
-func init() {
- proto.RegisterType((*Snapshot)(nil), "chain.core.txdb.internal.storage.Snapshot")
- proto.RegisterType((*Snapshot_Nonce)(nil), "chain.core.txdb.internal.storage.Snapshot.Nonce")
- proto.RegisterType((*Snapshot_StateTreeNode)(nil), "chain.core.txdb.internal.storage.Snapshot.StateTreeNode")
-}
-
-func init() { proto.RegisterFile("snapshot.proto", fileDescriptor0) }
-
-var fileDescriptor0 = []byte{
- // 218 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x90, 0x3b, 0x4f, 0xc4, 0x30,
- 0x10, 0x84, 0xe5, 0x7b, 0x71, 0xb7, 0x3c, 0x84, 0x5c, 0x45, 0x47, 0x13, 0xa8, 0x52, 0xad, 0x10,
- 0x34, 0xa9, 0xa9, 0x68, 0x48, 0xe1, 0x50, 0xd1, 0x20, 0x27, 0x59, 0xe1, 0x08, 0xf0, 0x46, 0xb6,
- 0x8b, 0xa4, 0xe4, 0x9f, 0xa3, 0x38, 0xa6, 0xa0, 0x42, 0x74, 0xa3, 0x91, 0xbe, 0x6f, 0x56, 0x0b,
- 0x17, 0xde, 0xea, 0xc1, 0x1b, 0x0e, 0x38, 0x38, 0x0e, 0x2c, 0xf3, 0xd6, 0xe8, 0xde, 0x62, 0xcb,
- 0x8e, 0x30, 0x8c, 0x5d, 0x83, 0xbd, 0x0d, 0xe4, 0xac, 0xfe, 0x40, 0x1f, 0xd8, 0xe9, 0x37, 0xba,
- 0xf9, 0x5a, 0xc1, 0xbe, 0x4e, 0x90, 0xac, 0x60, 0x6b, 0xb9, 0x23, 0x9f, 0x89, 0x7c, 0x5d, 0x9c,
- 0xde, 0x95, 0xf8, 0x17, 0x8e, 0x3f, 0x28, 0xd6, 0x41, 0x07, 0x7a, 0x76, 0x44, 0x15, 0x77, 0xa4,
- 0x16, 0x8d, 0x7c, 0x84, 0x9d, 0x65, 0xdb, 0x92, 0xcf, 0x56, 0x51, 0x78, 0xfb, 0x0f, 0x61, 0x35,
- 0x83, 0x2a, 0xf1, 0xc7, 0x12, 0xb6, 0xb1, 0x90, 0x12, 0x36, 0x46, 0x7b, 0x93, 0x89, 0x5c, 0x14,
- 0x67, 0x2a, 0x66, 0x79, 0x05, 0x07, 0x1a, 0x87, 0xde, 0x4d, 0xaf, 0x9f, 0xf3, 0x92, 0x28, 0x36,
- 0x6a, 0xbf, 0x14, 0x4f, 0xfe, 0x78, 0x0d, 0xe7, 0xbf, 0x6e, 0x93, 0x97, 0xb0, 0x7e, 0xa7, 0x29,
- 0x09, 0xe6, 0xf8, 0x70, 0x78, 0x39, 0x49, 0xf3, 0xcd, 0x2e, 0xfe, 0xed, 0xfe, 0x3b, 0x00, 0x00,
- 0xff, 0xff, 0x24, 0x3f, 0x72, 0xb5, 0x49, 0x01, 0x00, 0x00,
-}
--- /dev/null
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: storage.proto
+
+/*
+Package storage is a generated protocol buffer package.
+
+It is generated from these files:
+ storage.proto
+
+It has these top-level messages:
+ Snapshot
+ Mainchain
+*/
+package storage
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+// Snapshot represents a snapshot of the blockchain, including the state
+// tree and issuance memory.
+type Snapshot struct {
+ // Nodes contains every node within the state tree, including interior nodes.
+ // The nodes are ordered according to a pre-order traversal.
+ Nodes []*Snapshot_StateTreeNode `protobuf:"bytes,1,rep,name=nodes" json:"nodes,omitempty"`
+ // Nonces contains the record of recent nonces for ensuring
+ // uniqueness of issuances.
+ Nonces []*Snapshot_Nonce `protobuf:"bytes,2,rep,name=nonces" json:"nonces,omitempty"`
+}
+
+func (m *Snapshot) Reset() { *m = Snapshot{} }
+func (m *Snapshot) String() string { return proto.CompactTextString(m) }
+func (*Snapshot) ProtoMessage() {}
+func (*Snapshot) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Snapshot) GetNodes() []*Snapshot_StateTreeNode {
+ if m != nil {
+ return m.Nodes
+ }
+ return nil
+}
+
+func (m *Snapshot) GetNonces() []*Snapshot_Nonce {
+ if m != nil {
+ return m.Nonces
+ }
+ return nil
+}
+
+type Snapshot_Nonce struct {
+ Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
+ ExpiryMs uint64 `protobuf:"varint,2,opt,name=expiry_ms,json=expiryMs" json:"expiry_ms,omitempty"`
+}
+
+func (m *Snapshot_Nonce) Reset() { *m = Snapshot_Nonce{} }
+func (m *Snapshot_Nonce) String() string { return proto.CompactTextString(m) }
+func (*Snapshot_Nonce) ProtoMessage() {}
+func (*Snapshot_Nonce) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
+
+func (m *Snapshot_Nonce) GetHash() []byte {
+ if m != nil {
+ return m.Hash
+ }
+ return nil
+}
+
+func (m *Snapshot_Nonce) GetExpiryMs() uint64 {
+ if m != nil {
+ return m.ExpiryMs
+ }
+ return 0
+}
+
+type Snapshot_StateTreeNode struct {
+ Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *Snapshot_StateTreeNode) Reset() { *m = Snapshot_StateTreeNode{} }
+func (m *Snapshot_StateTreeNode) String() string { return proto.CompactTextString(m) }
+func (*Snapshot_StateTreeNode) ProtoMessage() {}
+func (*Snapshot_StateTreeNode) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 1} }
+
+func (m *Snapshot_StateTreeNode) GetKey() []byte {
+ if m != nil {
+ return m.Key
+ }
+ return nil
+}
+
+// Mainchain represents a mainchain of the blockchain
+type Mainchain struct {
+ Hashs []*Mainchain_Hash `protobuf:"bytes,1,rep,name=hashs" json:"hashs,omitempty"`
+}
+
+func (m *Mainchain) Reset() { *m = Mainchain{} }
+func (m *Mainchain) String() string { return proto.CompactTextString(m) }
+func (*Mainchain) ProtoMessage() {}
+func (*Mainchain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *Mainchain) GetHashs() []*Mainchain_Hash {
+ if m != nil {
+ return m.Hashs
+ }
+ return nil
+}
+
+type Mainchain_Hash struct {
+ Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *Mainchain_Hash) Reset() { *m = Mainchain_Hash{} }
+func (m *Mainchain_Hash) String() string { return proto.CompactTextString(m) }
+func (*Mainchain_Hash) ProtoMessage() {}
+func (*Mainchain_Hash) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} }
+
+func (m *Mainchain_Hash) GetKey() []byte {
+ if m != nil {
+ return m.Key
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*Snapshot)(nil), "chain.core.txdb.internal.storage.Snapshot")
+ proto.RegisterType((*Snapshot_Nonce)(nil), "chain.core.txdb.internal.storage.Snapshot.Nonce")
+ proto.RegisterType((*Snapshot_StateTreeNode)(nil), "chain.core.txdb.internal.storage.Snapshot.StateTreeNode")
+ proto.RegisterType((*Mainchain)(nil), "chain.core.txdb.internal.storage.Mainchain")
+ proto.RegisterType((*Mainchain_Hash)(nil), "chain.core.txdb.internal.storage.Mainchain.Hash")
+}
+
+func init() { proto.RegisterFile("storage.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+ // 252 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0x31, 0x4f, 0xc3, 0x30,
+ 0x10, 0x85, 0x95, 0x36, 0x29, 0xcd, 0x41, 0x25, 0xe4, 0x29, 0x0a, 0x4b, 0xe8, 0x94, 0xe9, 0x84,
+ 0x60, 0xe9, 0xcc, 0x80, 0xba, 0x34, 0x83, 0xcb, 0xc4, 0x82, 0xdc, 0xe4, 0x84, 0x23, 0xa8, 0x1d,
+ 0xd9, 0x1e, 0xda, 0x91, 0x7f, 0x8e, 0xec, 0xb8, 0x48, 0x88, 0x01, 0xba, 0x9d, 0x4f, 0xfe, 0xbe,
+ 0xf7, 0x64, 0xc3, 0xc2, 0x3a, 0x6d, 0xc4, 0x1b, 0xe1, 0x60, 0xb4, 0xd3, 0xac, 0x6a, 0xa5, 0xe8,
+ 0x15, 0xb6, 0xda, 0x10, 0xba, 0x43, 0xb7, 0xc3, 0x5e, 0x39, 0x32, 0x4a, 0x7c, 0x60, 0xbc, 0xb7,
+ 0xfc, 0x9c, 0xc0, 0x7c, 0xab, 0xc4, 0x60, 0xa5, 0x76, 0xac, 0x81, 0x4c, 0xe9, 0x8e, 0x6c, 0x91,
+ 0x54, 0xd3, 0xfa, 0xf2, 0x7e, 0x85, 0x7f, 0xe1, 0x78, 0x42, 0x71, 0xeb, 0x84, 0xa3, 0x67, 0x43,
+ 0xd4, 0xe8, 0x8e, 0xf8, 0xa8, 0x61, 0x6b, 0x98, 0x29, 0xad, 0x5a, 0xb2, 0xc5, 0x24, 0x08, 0xef,
+ 0xce, 0x10, 0x36, 0x1e, 0xe4, 0x91, 0x2f, 0x57, 0x90, 0x85, 0x05, 0x63, 0x90, 0x4a, 0x61, 0x65,
+ 0x91, 0x54, 0x49, 0x7d, 0xc5, 0xc3, 0xcc, 0x6e, 0x20, 0xa7, 0xc3, 0xd0, 0x9b, 0xe3, 0xeb, 0xde,
+ 0x27, 0x25, 0x75, 0xca, 0xe7, 0xe3, 0x62, 0x63, 0xcb, 0x5b, 0x58, 0xfc, 0xe8, 0xc6, 0xae, 0x61,
+ 0xfa, 0x4e, 0xc7, 0x28, 0xf0, 0xe3, 0x72, 0x0f, 0xf9, 0x46, 0xf4, 0x2a, 0x74, 0x63, 0x4f, 0x90,
+ 0x79, 0xe9, 0xe9, 0x0d, 0xfe, 0x51, 0xf9, 0x9b, 0xc5, 0xb5, 0xb0, 0x92, 0x8f, 0x78, 0x59, 0x40,
+ 0xea, 0x8f, 0xbf, 0xe3, 0x1e, 0xf3, 0x97, 0x8b, 0x88, 0xee, 0x66, 0xe1, 0x9b, 0x1e, 0xbe, 0x02,
+ 0x00, 0x00, 0xff, 0xff, 0xf6, 0x91, 0x6b, 0xd7, 0xb7, 0x01, 0x00, 0x00,
+}
}
}
+// Mainchain represents a mainchain of the blockchain
+message Mainchain {
+
+ repeated Hash hashs = 1;
+
+ message Hash {
+ bytes key = 1;
+ }
+}
\ No newline at end of file
--- /dev/null
+package txdb
+
+import (
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+ dbm "github.com/tendermint/tmlibs/db"
+
+ "github.com/bytom/blockchain/txdb/internal/storage"
+ "github.com/bytom/errors"
+ "github.com/bytom/protocol/bc"
+)
+
+func calcMainchainKey(hash *bc.Hash) []byte {
+ return []byte(fmt.Sprintf("MC:%v", hash.String()))
+}
+
+// DecodeMainchain decodes a Mainchain from bytes
+func DecodeMainchain(data []byte) (map[uint64]*bc.Hash, error) {
+ var mainchainList storage.Mainchain
+ if err := proto.Unmarshal(data, &mainchainList); err != nil {
+ return nil, errors.Wrap(err, "unmarshaling Mainchain proto")
+ }
+
+ mainchain := make(map[uint64]*bc.Hash)
+ for i, rawHash := range mainchainList.Hashs {
+ var b32 [32]byte
+ copy(b32[:], rawHash.Key)
+ hash := bc.NewHash(b32)
+ mainchain[uint64(i+1)] = &hash
+ }
+
+ return mainchain, nil
+}
+
+func saveMainchain(db dbm.DB, mainchain map[uint64]*bc.Hash, hash *bc.Hash) error {
+ var mainchainList storage.Mainchain
+ for i := 1; i <= len(mainchain); i++ {
+ rawHash := &storage.Mainchain_Hash{Key: mainchain[uint64(i)].Bytes()}
+ mainchainList.Hashs = append(mainchainList.Hashs, rawHash)
+ }
+
+ b, err := proto.Marshal(&mainchainList)
+ if err != nil {
+ return errors.Wrap(err, "marshaling Mainchain")
+ }
+
+ db.Set(calcMainchainKey(hash), b)
+ db.SetSync(nil, nil)
+ return nil
+}
+
+func getMainchain(db dbm.DB, hash *bc.Hash) (map[uint64]*bc.Hash, error) {
+ data := db.Get(calcMainchainKey(hash))
+ if data == nil {
+ return nil, errors.New("no this Mainchain.")
+ }
+
+ mainchain, err := DecodeMainchain(data)
+ if err != nil {
+ return nil, errors.Wrap(err, "decoding Mainchain")
+ }
+ return mainchain, nil
+}
package txdb
import (
- "context"
- "encoding/json"
"fmt"
"github.com/golang/protobuf/proto"
+ dbm "github.com/tendermint/tmlibs/db"
"github.com/bytom/blockchain/txdb/internal/storage"
"github.com/bytom/errors"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/patricia"
"github.com/bytom/protocol/state"
- . "github.com/tendermint/tmlibs/common"
- dbm "github.com/tendermint/tmlibs/db"
)
-func calcSnapshotKey(height uint64) []byte {
- return []byte(fmt.Sprintf("S:%v", height))
-}
-
-func calcLatestSnapshotHeight() []byte {
- return []byte("LatestSnapshotHeight")
+func calcSnapshotKey(hash *bc.Hash) []byte {
+ return []byte(fmt.Sprintf("S:%v", hash.String()))
}
-// DecodeSnapshot decodes a snapshot from the Chain Core's binary,
-// protobuf representation of the snapshot.
+// DecodeSnapshot decodes a snapshot from bytes
func DecodeSnapshot(data []byte) (*state.Snapshot, error) {
var storedSnapshot storage.Snapshot
- err := proto.Unmarshal(data, &storedSnapshot)
- if err != nil {
+ if err := proto.Unmarshal(data, &storedSnapshot); err != nil {
return nil, errors.Wrap(err, "unmarshaling state snapshot proto")
}
tree := new(patricia.Tree)
for _, node := range storedSnapshot.Nodes {
- err = tree.Insert(node.Key)
- if err != nil {
+ if err := tree.Insert(node.Key); err != nil {
return nil, errors.Wrap(err, "reconstructing state tree")
}
}
}, nil
}
-var latestSnapshotHeight = []byte("latestSnapshotHeight")
-
-type SnapshotHeightJSON struct {
- Height uint64
-}
-
-func (bsj SnapshotHeightJSON) Save(db dbm.DB) {
- bytes, err := json.Marshal(bsj)
- if err != nil {
- PanicSanity(Fmt("Could not marshal state bytes: %v", err))
- }
- db.SetSync(latestSnapshotHeight, bytes)
-}
-
-func LoadSnapshotHeightJSON(db dbm.DB) SnapshotHeightJSON {
- bytes := db.Get(latestSnapshotHeight)
- if bytes == nil {
- return SnapshotHeightJSON{
- Height: 0,
- }
- }
- bsj := SnapshotHeightJSON{}
- err := json.Unmarshal(bytes, &bsj)
- if err != nil {
- PanicCrisis(Fmt("Could not unmarshal bytes: %X", bytes))
- }
- return bsj
-}
-
-func storeStateSnapshot(ctx context.Context, db dbm.DB, snapshot *state.Snapshot, blockHeight uint64) error {
+func saveSnapshot(db dbm.DB, snapshot *state.Snapshot, hash *bc.Hash) error {
var storedSnapshot storage.Snapshot
err := patricia.Walk(snapshot.Tree, func(key []byte) error {
n := &storage.Snapshot_StateTreeNode{Key: key}
storedSnapshot.Nonces = make([]*storage.Snapshot_Nonce, 0, len(snapshot.Nonces))
for k, v := range snapshot.Nonces {
- hash := k
storedSnapshot.Nonces = append(storedSnapshot.Nonces, &storage.Snapshot_Nonce{
- Hash: hash.Bytes(), // TODO(bobg): now that hash is a protobuf, use it directly in the snapshot protobuf?
+ Hash: k.Bytes(),
ExpiryMs: v,
})
}
return errors.Wrap(err, "marshaling state snapshot")
}
- // set new snapshot.
- db.Set(calcSnapshotKey(blockHeight), b)
- SnapshotHeightJSON{Height: blockHeight}.Save(db)
- //TO DO: delete old snapshot.
+ db.Set(calcSnapshotKey(hash), b)
db.SetSync(nil, nil)
- return errors.Wrap(err, "deleting old snapshots")
+ return nil
}
-func getStateSnapshot(ctx context.Context, db dbm.DB) (*state.Snapshot, uint64, error) {
- height := LoadSnapshotHeightJSON(db).Height
- data := db.Get(calcSnapshotKey(height))
+func getSnapshot(db dbm.DB, hash *bc.Hash) (*state.Snapshot, error) {
+ data := db.Get(calcSnapshotKey(hash))
if data == nil {
- return nil, height, errors.New("no this snapshot.")
+ return nil, errors.New("no this snapshot.")
}
snapshot, err := DecodeSnapshot(data)
if err != nil {
- return nil, height, errors.Wrap(err, "decoding snapshot")
- }
- return snapshot, height, nil
-}
-
-// getRawSnapshot returns the raw, protobuf-encoded snapshot data at the
-// provided height.
-func getRawSnapshot(ctx context.Context, db dbm.DB, height uint64) (data []byte, err error) {
- bytez := db.Get(calcSnapshotKey(height))
- if bytez == nil {
- return nil, errors.New("no this height snapshot.")
+ return nil, errors.Wrap(err, "decoding snapshot")
}
- return bytez, nil
+ return snapshot, nil
}
package txdb
import (
- "context"
"encoding/json"
"fmt"
+ . "github.com/tendermint/tmlibs/common"
+ dbm "github.com/tendermint/tmlibs/db"
+
"github.com/bytom/errors"
+ "github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/legacy"
"github.com/bytom/protocol/state"
- . "github.com/tendermint/tmlibs/common"
- dbm "github.com/tendermint/tmlibs/db"
)
-// A Store encapsulates storage for blockchain validation.
-// It satisfies the interface protocol.Store, and provides additional
-// methods for querying current data.
-type Store struct {
- db dbm.DB
-
- cache blockCache
-}
-
-//var _ protocol.Store = (*Store)(nil)
-
-func calcBlockHeadKey(height uint64) []byte {
- return []byte(fmt.Sprintf("H:%v", height))
-}
-
-func calcBlockKey(height uint64) []byte {
- return []byte(fmt.Sprintf("B:%v", height))
-}
-
-func LoadBlock(db dbm.DB, height uint64) *legacy.Block {
- var block *legacy.Block = &legacy.Block{}
- bytez := db.Get(calcBlockKey(height))
- if bytez == nil {
- return nil
- }
-
- block.UnmarshalText(bytez)
- return block
-}
-
var blockStoreKey = []byte("blockStore")
type BlockStoreStateJSON struct {
Height uint64
+ Hash *bc.Hash
}
func (bsj BlockStoreStateJSON) Save(db dbm.DB) {
db.SetSync(blockStoreKey, bytes)
}
-func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
+func loadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
bytes := db.Get(blockStoreKey)
if bytes == nil {
return BlockStoreStateJSON{
return bsj
}
+// A Store encapsulates storage for blockchain validation.
+// It satisfies the interface protocol.Store, and provides additional
+// methods for querying current data.
+type Store struct {
+ db dbm.DB
+ cache blockCache
+}
+
+func calcBlockKey(hash *bc.Hash) []byte {
+ return []byte(fmt.Sprintf("B:%v", hash.String()))
+}
+
+func GetBlock(db dbm.DB, hash *bc.Hash) *legacy.Block {
+ var block *legacy.Block = &legacy.Block{}
+ bytez := db.Get(calcBlockKey(hash))
+ if bytez == nil {
+ return nil
+ }
+
+ block.UnmarshalText(bytez)
+ return block
+}
+
// NewStore creates and returns a new Store object.
-//
-// For testing purposes, it is usually much faster
-// and more convenient to use package bytom/protocol/memstore
-// instead.
func NewStore(db dbm.DB) *Store {
- cache := newBlockCache(func(height uint64) *legacy.Block {
- return LoadBlock(db, height)
+ cache := newBlockCache(func(hash *bc.Hash) *legacy.Block {
+ return GetBlock(db, hash)
})
return &Store{
db: db,
}
}
-// Height returns the height of the blockchain.
-func (s *Store) Height() uint64 {
- heightJson := LoadBlockStoreStateJSON(s.db)
- return heightJson.Height
+func (s *Store) BlockExist(hash *bc.Hash) bool {
+ block, err := s.cache.lookup(hash)
+ return err == nil && block != nil
}
-// GetBlock looks up the block with the provided block height.
-// If no block is found at that height, it returns an error.
-
-func (s *Store) GetBlock(height uint64) (*legacy.Block, error) {
- return s.cache.lookup(height)
-}
-
-func (s *Store) GetRawBlock(height uint64) ([]byte, error) {
- bytez := s.db.Get(calcBlockKey(height))
- if bytez == nil {
- return nil, errors.New("querying blocks from the db null")
- }
- return bytez, nil
+func (s *Store) GetBlock(hash *bc.Hash) (*legacy.Block, error) {
+ return s.cache.lookup(hash)
}
-// LatestSnapshot returns the most recent state snapshot stored in
-// the database and its corresponding block height.
-func (s *Store) LatestSnapshot(ctx context.Context) (*state.Snapshot, uint64, error) {
- return getStateSnapshot(ctx, s.db)
+func (s *Store) GetStoreStatus() BlockStoreStateJSON {
+ return loadBlockStoreStateJSON(s.db)
}
-/*
-// LatestSnapshotInfo returns the height and size of the most recent
-// state snapshot stored in the database.
-func (s *Store) LatestSnapshotInfo(ctx context.Context) (height uint64, size uint64, err error) {
- const q = `
- SELECT height, octet_length(data) FROM snapshots ORDER BY height DESC LIMIT 1
- `
- err = s.db.QueryRowContext(ctx, q).Scan(&height, &size)
- return height, size, err
+func (s *Store) GetMainchain(hash *bc.Hash) (map[uint64]*bc.Hash, error) {
+ return getMainchain(s.db, hash)
}
-*/
-// GetSnapshot returns the state snapshot stored at the provided height,
-// in Chain Core's binary protobuf representation. If no snapshot exists
-// at the provided height, an error is returned.
-func (s *Store) GetSnapshot(ctx context.Context, height uint64) ([]byte, error) {
- return getRawSnapshot(ctx, s.db, height)
+func (s *Store) GetSnapshot(hash *bc.Hash) (*state.Snapshot, error) {
+ return getSnapshot(s.db, hash)
}
// SaveBlock persists a new block in the database.
func (s *Store) SaveBlock(block *legacy.Block) error {
- s.cache.add(block)
- height := block.Height
-
binaryBlock, err := block.MarshalText()
if err != nil {
PanicCrisis(Fmt("Error Marshal block meta: %v", err))
}
- s.db.Set(calcBlockKey(height), binaryBlock)
- // Save new BlockStoreStateJSON descriptor
- BlockStoreStateJSON{Height: height}.Save(s.db)
-
- // Flush
+ blockHash := block.Hash()
+ s.db.Set(calcBlockKey(&blockHash), binaryBlock)
s.db.SetSync(nil, nil)
-
return nil
}
+func (s *Store) SaveMainchain(mainchain map[uint64]*bc.Hash, hash *bc.Hash) error {
+ err := saveMainchain(s.db, mainchain, hash)
+ return errors.Wrap(err, "saving mainchain")
+}
+
// SaveSnapshot saves a state snapshot to the database.
-func (s *Store) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error {
- err := storeStateSnapshot(ctx, s.db, snapshot, height)
+func (s *Store) SaveSnapshot(snapshot *state.Snapshot, hash *bc.Hash) error {
+ err := saveSnapshot(s.db, snapshot, hash)
return errors.Wrap(err, "saving state tree")
}
-func (s *Store) FinalizeBlock(ctx context.Context, height uint64) error {
- // _, err := s.db.ExecContext(ctx, `SELECT pg_notify('newblock', $1)`, height)
- return nil
+func (s *Store) SaveStoreStatus(height uint64, hash *bc.Hash) {
+ BlockStoreStateJSON{Height: height, Hash: hash}.Save(s.db)
+ //TODO: clean the old snapshot && mainchain
}
+++ /dev/null
-// Package txdb provides storage for Chain Protocol blockchain
-// data structures.
-package txdb
-
-/*
-import (
- "context"
- "strconv"
-
- "chain/database/pg"
- "chain/errors"
- "chain/log"
-)
-
-func ListenBlocks(ctx context.Context, dbURL string) (<-chan uint64, error) {
- listener, err := pg.NewListener(ctx, dbURL, "newblock")
- if err != nil {
- return nil, err
- }
-
- c := make(chan uint64)
- go func() {
- defer func() {
- listener.Close()
- close(c)
- }()
-
- for {
- select {
- case <-ctx.Done():
- return
-
- case n := <-listener.Notify:
- height, err := strconv.ParseUint(n.Extra, 10, 64)
- if err != nil {
- log.Error(ctx, errors.Wrap(err, "parsing db notification payload"))
- return
- }
- c <- height
- }
- }
- }()
-
- return c, nil
-}
-
-// GetRawBlock queries the database for the block at the provided height.
-// The block is returned as raw bytes.
-func (s *Store) GetRawBlock(ctx context.Context, height uint64) ([]byte, error) {
- const q = `SELECT data FROM blocks WHERE height = $1`
- var block []byte
- err := s.db.QueryRowContext(ctx, q, height).Scan(&block)
- return block, errors.Wrap(err, "querying blocks from the db")
-}
-*/
}
func BlockSubsidy(height uint64) uint64 {
- if height == 0 {
+ if height == 1 {
return initialBlockSubsidy
}
return baseSubsidy >> uint(height/subsidyReductionInterval)
}
func InitBlock() []byte {
- return []byte("0301000000000000000000000000000000000000000000000000000000000000000000ece090e7eb2b4078a79ed5c640a026361c4af77a37342e503cc68493229996e11dd9be38b18f5b492159980684155da19e87de0d1b37b35c1a1123770ec1dcc710aabe77607cce00b1c5a181808080802e0107010700ece090e7eb2b000001012cffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8080ccdee2a69fb314010151000000")
+ return []byte("0301010000000000000000000000000000000000000000000000000000000000000000cecccaebf42b406b03545ed2b38a578e5e6b0796d4ebdd8a6dd72210873fcc026c7319de578ffc492159980684155da19e87de0d1b37b35c1a1123770ec1dcc710aabe77607cced7bb1993fcb680808080801e0107010700cecccaebf42b000001012cffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8080ccdee2a69fb314010151000000")
}
package cpuminer
import (
- log "github.com/sirupsen/logrus"
"sync"
"time"
+ log "github.com/sirupsen/logrus"
+
"github.com/bytom/consensus"
"github.com/bytom/mining"
"github.com/bytom/protocol"
payToAddr := []byte{}
block, err := mining.NewBlockTemplate(m.chain, m.txPool, payToAddr)
if err != nil {
- log.WithField("error", err).Error("Failed to create new block template")
+ log.Errorf("Mining: failed on create NewBlockTemplate: %v", err)
continue
}
if m.solveBlock(block, ticker, quit) {
- if err := m.chain.AddBlock(nil, block); err == nil {
+ if isOrphan, err := m.chain.ProcessBlock(block); err == nil {
log.WithFields(log.Fields{
- "height": block.BlockHeader.Height,
- "tx": len(block.Transactions),
- }).Info("Finish committing block height")
+ "height": block.BlockHeader.Height,
+ "isOrphan": isOrphan,
+ "tx": len(block.Transactions),
+ }).Info("Miner processed block")
} else {
- log.WithFields(log.Fields{
- "height": block.BlockHeader.Height,
- "error": err,
- }).Error("Failed to commit block height")
+ log.WithField("height", block.BlockHeader.Height).Errorf("Miner fail on ProcessBlock %v", err)
}
}
}
package mining
import (
- log "github.com/sirupsen/logrus"
"time"
+ log "github.com/sirupsen/logrus"
+
"github.com/bytom/blockchain/txbuilder"
"github.com/bytom/consensus"
"github.com/bytom/errors"
txFee := uint64(0)
var compareDiffBH *legacy.BlockHeader
- if compareDiffBlock, err := c.GetBlock(nextBlockHeight - consensus.BlocksPerRetarget); err == nil {
+ if compareDiffBlock, err := c.GetBlockByHeight(nextBlockHeight - consensus.BlocksPerRetarget); err == nil {
compareDiffBH = &compareDiffBlock.BlockHeader
}
"testing"
"time"
+ "github.com/bytom/consensus"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/legacy"
"github.com/bytom/protocol/state"
)
func TestNewInitBlock(t *testing.T) {
- coinbaseTx, err := createCoinbaseTx(0, 0, []byte{})
+ coinbaseTx, err := createCoinbaseTx(0, 1, []byte{})
if err != nil {
t.Error(err)
}
b := &legacy.Block{
BlockHeader: legacy.BlockHeader{
Version: 1,
- Height: 0,
+ Height: 1,
PreviousBlockHash: bc.Hash{},
TimestampMS: bc.Millis(time.Now()),
BlockCommitment: legacy.BlockCommitment{
TransactionsMerkleRoot: merkleRoot,
AssetsMerkleRoot: snap.Tree.RootHash(),
},
- Bits: uint64(3314649325747331761),
- Nonce: 0,
+ Bits: uint64(2161727821138738707),
},
Transactions: []*legacy.Tx{coinbaseTx},
}
+ for i := uint64(0); i <= 10000000000000; i++ {
+ b.Nonce = i
+ hash := b.Hash()
+
+ if consensus.CheckProofOfWork(&hash, b.Bits) {
+ break
+ }
+ }
+
rawBlock, err := b.MarshalText()
if err != nil {
t.Error(err)
package node
import (
+ "chain/errors"
"context"
"crypto/tls"
"net"
"net/http"
+ _ "net/http/pprof"
"os"
"strings"
"sync"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
- _ "net/http/pprof"
bc "github.com/bytom/blockchain"
"github.com/bytom/blockchain/account"
cfg "github.com/bytom/config"
"github.com/bytom/consensus"
"github.com/bytom/env"
- "github.com/bytom/errors"
- p2p "github.com/bytom/p2p"
+ "github.com/bytom/p2p"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc/legacy"
"github.com/bytom/types"
sw *p2p.Switch // p2p connections
addrBook *p2p.AddrBook // known peers
- // services
- evsw types.EventSwitch // pub/sub for services
- // blockStore *bc.MemStore
+ evsw types.EventSwitch // pub/sub for services
blockStore *txdb.Store
bcReactor *bc.BlockchainReactor
accounts *account.Manager
// it's blocking and we need to proceed to the rest of the core setup after
// we call it.
go func() {
- err := server.Serve(listener)
- log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
+ if err := server.Serve(listener); err != nil {
+ log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
+ }
}()
coreHandler.Set(h)
}
sw := p2p.NewSwitch(config.P2P)
- fastSync := config.FastSync
-
genesisBlock := &legacy.Block{
BlockHeader: legacy.BlockHeader{},
Transactions: []*legacy.Tx{},
genesisBlock.UnmarshalText(consensus.InitBlock())
txPool := protocol.NewTxPool()
- chain, err := protocol.NewChain(ctx, genesisBlock.Hash(), store, txPool, nil)
+ chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
+ if err != nil {
+ cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
+ }
- if store.Height() < 1 {
- if err := chain.AddBlock(nil, genesisBlock); err != nil {
- cmn.Exit(cmn.Fmt("Failed to add genesisBlock to Chain: %v", err))
+ if chain.Height() == 0 {
+ if err := chain.SaveBlock(genesisBlock); err != nil {
+ cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
+ }
+ if err := chain.ConnectBlock(genesisBlock); err != nil {
+ cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
}
}
accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
pinStore = pin.NewStore(accUTXODB)
- err = pinStore.LoadAll(ctx)
- if err != nil {
+ if err = pinStore.LoadAll(ctx); err != nil {
log.WithField("error", err).Error("load pin store")
return nil
}
- pinHeight := store.Height()
+ pinHeight := chain.Height()
if pinHeight > 0 {
pinHeight = pinHeight - 1
}
pins := []string{account.PinName, account.DeleteSpentsPinName}
for _, p := range pins {
- err = pinStore.CreatePin(ctx, p, pinHeight)
- if err != nil {
+ if err = pinStore.CreatePin(ctx, p, pinHeight); err != nil {
log.WithField("error", err).Error("Create pin")
}
}
if err != nil {
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}
- bcReactor := bc.NewBlockchainReactor(
- store,
- chain,
- txPool,
- accounts,
- assets,
- sw,
- hsm,
- fastSync,
- pinStore)
+ bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore)
sw.AddReactor("BLOCKCHAIN", bcReactor)
package protocol
import (
- "context"
- "time"
+ log "github.com/sirupsen/logrus"
"github.com/bytom/errors"
+ "github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/legacy"
"github.com/bytom/protocol/state"
"github.com/bytom/protocol/validation"
-
- log "github.com/sirupsen/logrus"
)
-// maxBlockTxs limits the number of transactions
-// included in each block.
-const maxBlockTxs = 10000
-
-// saveSnapshotFrequency stores how often to save a state
-// snapshot to the Store.
-const saveSnapshotFrequency = time.Hour
-
var (
// ErrBadBlock is returned when a block is invalid.
ErrBadBlock = errors.New("invalid block")
ErrBadStateRoot = errors.New("invalid state merkle root")
)
-// GetBlock returns the block at the given height, if there is one,
-// otherwise it returns an error.
-func (c *Chain) GetBlock(height uint64) (*legacy.Block, error) {
- return c.store.GetBlock(height)
+func (c *Chain) BlockExist(hash *bc.Hash) bool {
+ return c.orphanManage.BlockExist(hash) || c.store.BlockExist(hash)
+}
+
+func (c *Chain) GetBlockByHash(hash *bc.Hash) (*legacy.Block, error) {
+ return c.store.GetBlock(hash)
+}
+
+func (c *Chain) GetBlockByHeight(height uint64) (*legacy.Block, error) {
+ c.state.cond.L.Lock()
+ hash, ok := c.state.mainChain[height]
+ c.state.cond.L.Unlock()
+ if !ok {
+ return nil, errors.New("can't find block in given hight")
+ }
+ return c.GetBlockByHash(hash)
}
// ValidateBlock validates an incoming block in advance of applying it
func (c *Chain) ValidateBlock(block, prev *legacy.Block) error {
blockEnts := legacy.MapBlock(block)
prevEnts := legacy.MapBlock(prev)
- err := validation.ValidateBlock(blockEnts, prevEnts)
- if err != nil {
+ if err := validation.ValidateBlock(blockEnts, prevEnts); err != nil {
return errors.Sub(ErrBadBlock, err)
}
- return errors.Sub(ErrBadBlock, err)
+ return nil
}
-// ApplyValidBlock creates an updated snapshot without validating the
-// block.
-func (c *Chain) ApplyValidBlock(block *legacy.Block) (*state.Snapshot, error) {
- _, curSnapshot := c.State()
- newSnapshot := state.Copy(curSnapshot)
- err := newSnapshot.ApplyBlock(legacy.MapBlock(block))
- if err != nil {
- return nil, err
+func (c *Chain) ConnectBlock(block *legacy.Block) error {
+ c.state.cond.L.Lock()
+ defer c.state.cond.L.Unlock()
+ return c.connectBlock(block)
+}
+
+func (c *Chain) connectBlock(block *legacy.Block) error {
+ newSnapshot := state.Copy(c.state.snapshot)
+ if err := newSnapshot.ApplyBlock(legacy.MapBlock(block)); err != nil {
+ return err
+ }
+
+ blockHash := block.Hash()
+ if err := c.setState(block, newSnapshot, map[uint64]*bc.Hash{block.Height: &blockHash}); err != nil {
+ return err
}
- //fmt.Printf("want %v, ger %v \n", block.BlockHeader.AssetsMerkleRoot, newSnapshot.Tree.RootHash())
- if block.AssetsMerkleRoot != newSnapshot.Tree.RootHash() {
- return nil, ErrBadStateRoot
+
+ for _, tx := range block.Transactions {
+ c.txPool.RemoveTransaction(&tx.Tx.ID)
}
- return newSnapshot, nil
+ return nil
}
-// CommitBlock commits a block to the blockchain. The block
-// must already have been applied with ApplyValidBlock or
-// ApplyNewBlock, which will have produced the new snapshot that's
-// required here.
-//
-// This function saves the block to the store and sometimes (not more
-// often than saveSnapshotFrequency) saves the state tree to the
-// store. New-block callbacks (via asynchronous block-processor pins)
-// are triggered.
-//
-// TODO(bobg): rename to CommitAppliedBlock for clarity (deferred from https://github.com/chain/chain/pull/788)
-func (c *Chain) CommitAppliedBlock(ctx context.Context, block *legacy.Block, snapshot *state.Snapshot) error {
- // SaveBlock is the linearization point. Once the block is committed
- // to persistent storage, the block has been applied and everything
- // else can be derived from that block.
- err := c.store.SaveBlock(block)
- if err != nil {
- return errors.Wrap(err, "storing block")
- }
- if block.Time().After(c.lastQueuedSnapshot.Add(saveSnapshotFrequency)) {
- c.queueSnapshot(ctx, block.Height, block.Time(), snapshot)
- }
-
- err = c.store.FinalizeBlock(ctx, block.Height)
- if err != nil {
- return errors.Wrap(err, "finalizing block")
- }
-
- // c.setState will update the local blockchain state and height.
- // When c.store is a txdb.Store, and c has been initialized with a
- // channel from txdb.ListenBlocks, then the above call to
- // c.store.FinalizeBlock will have done a postgresql NOTIFY and
- // that will wake up the goroutine in NewChain, which also calls
- // setHeight. But duplicate calls with the same blockheight are
- // harmless; and the following call is required in the cases where
- // it's not redundant.
- c.setState(block, snapshot)
+func (c *Chain) getReorganizeBlocks(block *legacy.Block) ([]*legacy.Block, []*legacy.Block) {
+ attachBlocks := []*legacy.Block{}
+ detachBlocks := []*legacy.Block{}
+ ancestor := block
- return nil
+ for !c.inMainchain(ancestor) {
+ attachBlocks = append([]*legacy.Block{ancestor}, attachBlocks...)
+ ancestor, _ = c.GetBlockByHash(&ancestor.PreviousBlockHash)
+ }
+
+ for d := c.state.block; d.Hash() != ancestor.Hash(); d, _ = c.GetBlockByHash(&d.PreviousBlockHash) {
+ detachBlocks = append(detachBlocks, d)
+ }
+
+ return attachBlocks, detachBlocks
}
-func (c *Chain) AddBlock(ctx context.Context, block *legacy.Block) error {
- currentBlock, _ := c.State()
- if err := c.ValidateBlock(block, currentBlock); err != nil {
- return err
+func (c *Chain) reorganizeChain(block *legacy.Block) error {
+ attachBlocks, detachBlocks := c.getReorganizeBlocks(block)
+ newSnapshot := state.Copy(c.state.snapshot)
+ chainChanges := map[uint64]*bc.Hash{}
+
+ for _, d := range detachBlocks {
+ if err := newSnapshot.DetachBlock(legacy.MapBlock(d)); err != nil {
+ return err
+ }
}
- newSnap, err := c.ApplyValidBlock(block)
- if err != nil {
- return err
+ for _, a := range attachBlocks {
+ if err := newSnapshot.ApplyBlock(legacy.MapBlock(a)); err != nil {
+ return err
+ }
+ aHash := a.Hash()
+ chainChanges[a.Height] = &aHash
}
- if err := c.CommitAppliedBlock(ctx, block, newSnap); err != nil {
+ return c.setState(block, newSnapshot, chainChanges)
+}
+
+func (c *Chain) SaveBlock(block *legacy.Block) error {
+ preBlock, _ := c.GetBlockByHash(&block.PreviousBlockHash)
+ if err := c.ValidateBlock(block, preBlock); err != nil {
return err
}
-
- for _, tx := range block.Transactions {
- c.txPool.RemoveTransaction(&tx.Tx.ID)
+ if err := c.store.SaveBlock(block); err != nil {
+ return err
}
+ blockHash := block.Hash()
+ log.WithFields(log.Fields{"height": block.Height, "hash": blockHash.String()}).Info("Block saved on disk")
return nil
}
-func (c *Chain) queueSnapshot(ctx context.Context, height uint64, timestamp time.Time, s *state.Snapshot) {
- // Non-blockingly queue the snapshot for storage.
- ps := pendingSnapshot{height: height, snapshot: s}
- select {
- case c.pendingSnapshots <- ps:
- c.lastQueuedSnapshot = timestamp
- default:
- // Skip it; saving snapshots is taking longer than the snapshotting period.
- log.WithField("last queued", c.lastQueuedSnapshot).Info("snapshot storage is taking too long")
+func (c *Chain) findBestChainTail(block *legacy.Block) (bestBlock *legacy.Block) {
+ bestBlock = block
+ blockHash := block.Hash()
+ preorphans, ok := c.orphanManage.preOrphans[blockHash]
+ if !ok {
+ return
+ }
+
+ for _, preorphan := range preorphans {
+ orphanBlock, ok := c.orphanManage.Get(preorphan)
+ if !ok {
+ continue
+ }
+
+ if err := c.SaveBlock(orphanBlock); err != nil {
+ log.WithFields(log.Fields{
+ "height": block.Height,
+ "hash": blockHash.String(),
+ }).Errorf("findBestChainTail fail on save block %v", err)
+ continue
+ }
+
+ if subResult := c.findBestChainTail(orphanBlock); subResult.Height > bestBlock.Height {
+ bestBlock = subResult
+ }
}
+
+ c.orphanManage.Delete(&blockHash)
+ return
}
-func (c *Chain) setHeight(h uint64) {
- // We call setHeight from two places independently:
- // CommitBlock and the Postgres LISTEN goroutine.
- // This means we can get here twice for each block,
- // and any of them might be arbitrarily delayed,
- // which means h might be from the past.
- // Detect and discard these duplicate calls.
+func (c *Chain) ProcessBlock(block *legacy.Block) (bool, error) {
+ blockHash := block.Hash()
+ if c.BlockExist(&blockHash) {
+ log.WithField("hash", blockHash.String()).Info("Skip process due to block already been handled")
+ return false, nil
+ }
+ if !c.store.BlockExist(&block.PreviousBlockHash) {
+ c.orphanManage.Add(block)
+ return true, nil
+ }
+ if err := c.SaveBlock(block); err != nil {
+ return false, err
+ }
+ bestBlock := c.findBestChainTail(block)
c.state.cond.L.Lock()
- defer c.state.cond.L.Unlock()
+ if c.state.block.Hash() == bestBlock.PreviousBlockHash {
+ defer c.state.cond.L.Unlock()
+ return false, c.connectBlock(bestBlock)
+ }
- if h <= c.state.height {
- return
+ if bestBlock.Height > c.state.height && bestBlock.Bits >= c.state.block.Bits {
+ defer c.state.cond.L.Unlock()
+ return false, c.reorganizeChain(bestBlock)
}
- c.state.height = h
- c.state.cond.Broadcast()
+ c.state.cond.L.Unlock()
+ return false, nil
}
+++ /dev/null
-package protocol
-
-/*
-func TestGetBlock(t *testing.T) {
- ctx := context.Background()
-
- b1 := &legacy.Block{BlockHeader: legacy.BlockHeader{Height: 1}}
- noBlocks := memstore.New()
- oneBlock := memstore.New()
- oneBlock.SaveBlock(ctx, b1)
- oneBlock.SaveSnapshot(ctx, 1, state.Empty())
-
- cases := []struct {
- store Store
- want *legacy.Block
- wantErr bool
- }{
- {noBlocks, nil, true},
- {oneBlock, b1, false},
- }
-
- for _, test := range cases {
- c, err := NewChain(ctx, b1.Hash(), test.store, nil)
- if err != nil {
- testutil.FatalErr(t, err)
- }
- got, gotErr := c.GetBlock(ctx, c.Height())
- if !testutil.DeepEqual(got, test.want) {
- t.Errorf("got latest = %+v want %+v", got, test.want)
- }
- if (gotErr != nil) != test.wantErr {
- t.Errorf("got latest err = %q want err?: %t", gotErr, test.wantErr)
- }
- }
-}
-
-func TestNoTimeTravel(t *testing.T) {
- ctx := context.Background()
- c, err := NewChain(ctx, bc.Hash{}, memstore.New(), nil)
- if err != nil {
- t.Fatal(err)
- }
-
- c.setHeight(1)
- c.setHeight(2)
-
- c.setHeight(1) // don't go backward
- if c.state.height != 2 {
- t.Fatalf("c.state.height = %d want 2", c.state.height)
- }
-}
-
-func TestWaitForBlockSoonAlreadyExists(t *testing.T) {
- c, _ := newTestChain(t, time.Now())
- makeEmptyBlock(t, c) // height=2
- makeEmptyBlock(t, c) // height=3
-
- err := <-c.BlockSoonWaiter(context.Background(), 2)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestWaitForBlockSoonDistantFuture(t *testing.T) {
- c, _ := newTestChain(t, time.Now())
-
- got := <-c.BlockSoonWaiter(context.Background(), 100) // distant future
- want := ErrTheDistantFuture
- if got != want {
- t.Errorf("BlockSoonWaiter(100) = %+v want %+v", got, want)
- }
-}
-
-func TestWaitForBlockSoonWaits(t *testing.T) {
- // This test is inherently racy. It's possible
- // that the block creation might run before
- // the wait's internal test loop finds no block.
- // In that case, the test will pass, but it will
- // not have tested anything.
- //
- // It's the best we can do.
-
- c, _ := newTestChain(t, time.Now())
- makeEmptyBlock(t, c) // height=2
-
- go func() {
- time.Sleep(10 * time.Millisecond) // sorry for the slow test 
- makeEmptyBlock(t, c) // height=3
- }()
-
- err := <-c.BlockSoonWaiter(context.Background(), 3)
- if err != nil {
- t.Fatal(err)
- }
- if g := c.Height(); g != 3 {
- t.Errorf("height after waiting = %d want 3", g)
- }
-}
-
-func TestWaitForBlockSoonTimesout(t *testing.T) {
- c, _ := newTestChain(t, time.Now())
- go func() {
- makeEmptyBlock(t, c) // height=2
- }()
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- defer cancel()
-
- err := <-c.BlockSoonWaiter(ctx, 3)
- if err != ctx.Err() {
- t.Fatalf("expected timeout err, got %v", err)
- }
-}
-
-func TestGenerateBlock(t *testing.T) {
- ctx := context.Background()
- now := time.Unix(233400000, 0)
- c, b1 := newTestChain(t, now)
-
- initialBlockHash := b1.Hash()
- assetID := bc.ComputeAssetID(nil, &initialBlockHash, 1, &bc.EmptyStringHash)
-
- txs := []*legacy.Tx{
- legacy.NewTx(legacy.TxData{
- Version: 1,
- MinTime: 233400000000,
- MaxTime: 233400000001,
- Inputs: []*legacy.TxInput{
- legacy.NewIssuanceInput([]byte{1}, 50, nil, initialBlockHash, nil, [][]byte{
- nil,
- mustDecodeHex("30450221009037e1d39b7d59d24eba8012baddd5f4ab886a51b46f52b7c479ddfa55eeb5c5022076008409243475b25dfba6db85e15cf3d74561a147375941e4830baa69769b5101"),
- mustDecodeHex("51210210b002870438af79b829bc22c4505e14779ef0080c411ad497d7a0846ee0af6f51ae")}, nil),
- },
- Outputs: []*legacy.TxOutput{
- legacy.NewTxOutput(assetID, 50, mustDecodeHex("a9145881cd104f8d64635751ac0f3c0decf9150c110687"), nil),
- },
- }),
- legacy.NewTx(legacy.TxData{
- Version: 1,
- MinTime: 233400000000,
- MaxTime: 233400000001,
- Inputs: []*legacy.TxInput{
- legacy.NewIssuanceInput([]byte{2}, 50, nil, initialBlockHash, nil, [][]byte{
- nil,
- mustDecodeHex("3045022100f3bcffcfd6a1ce9542b653500386cd0ee7b9c86c59390ca0fc0238c0ebe3f1d6022065ac468a51a016842660c3a616c99a9aa5109a3bad1877ba3e0f010f3972472e01"),
- mustDecodeHex("51210210b002870438af79b829bc22c4505e14779ef0080c411ad497d7a0846ee0af6f51ae"),
- }, nil),
- },
- Outputs: []*legacy.TxOutput{
- legacy.NewTxOutput(assetID, 50, mustDecodeHex("a914c171e443e05b953baa7b7d834028ed91e47b4d0b87"), nil),
- },
- }),
- }
-
- got, _, err := c.GenerateBlock(ctx, b1, state.Empty(), now, txs)
- if err != nil {
- t.Fatalf("err got = %v want nil", err)
- }
-
- // TODO(bobg): verify these hashes are correct
- wantTxRoot := mustDecodeHash("ab5f5f111beb1e6b49da8334360589c7da3aac1cdd61067ea9a55bec47cb745c")
- wantAssetsRoot := mustDecodeHash("a31a9b5f71a6d6fa0c87361db4a98c9a82f603f9d9ff584f6613b9d56ccf5ebd")
-
- want := &legacy.Block{
- BlockHeader: legacy.BlockHeader{
- Version: 1,
- Height: 2,
- PreviousBlockHash: b1.Hash(),
- TimestampMS: bc.Millis(now),
- BlockCommitment: legacy.BlockCommitment{
- TransactionsMerkleRoot: wantTxRoot,
- AssetsMerkleRoot: wantAssetsRoot,
- },
- },
- Transactions: txs,
- }
-
- if !testutil.DeepEqual(got, want) {
- t.Errorf("generated block:\ngot: %+v\nwant: %+v", got, want)
- }
-}
-
-// newTestChain returns a new Chain using memstore for storage,
-// along with an initial block b1 (with a 0/0 multisig program).
-// It commits b1 before returning.
-func newTestChain(tb testing.TB, ts time.Time) (c *Chain, b1 *legacy.Block) {
- ctx := context.Background()
-
- var err error
-
- b1, err = NewInitialBlock(ts)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- c, err = NewChain(ctx, b1.Hash(), memstore.New(), nil)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- // TODO(tessr): consider adding MaxIssuanceWindow to NewChain
- c.MaxIssuanceWindow = 48 * time.Hour
- err = c.CommitAppliedBlock(ctx, b1, state.Empty())
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- return c, b1
-}
-
-func makeEmptyBlock(tb testing.TB, c *Chain) {
- ctx := context.Background()
-
- curBlock, err := c.GetBlock(ctx, c.Height())
- if err != nil {
- testutil.FatalErr(tb, err)
- }
-
- if len(curBlock.Transactions) > 0 {
- tb.Fatal("cannot make nonempty block")
- }
-
- curState := state.Empty()
-
- nextBlock, nextState, err := c.GenerateBlock(ctx, curBlock, curState, time.Now(), nil)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- err = c.CommitAppliedBlock(ctx, nextBlock, nextState)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
-}
-
-func mustDecodeHex(s string) []byte {
- data, err := hex.DecodeString(s)
- if err != nil {
- panic(err)
- }
- return data
-}
-
-func mustDecodeHash(s string) (h bc.Hash) {
- err := h.UnmarshalText([]byte(s))
- if err != nil {
- panic(err)
- }
- return h
-}*/
"sync"
"time"
- log "github.com/sirupsen/logrus"
-
+ "github.com/bytom/blockchain/txdb"
"github.com/bytom/errors"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/legacy"
// and issuance memory. The Chain type uses Store to load state
// from storage and persist validated data.
type Store interface {
- Height() uint64
- GetBlock(uint64) (*legacy.Block, error)
- LatestSnapshot(context.Context) (*state.Snapshot, uint64, error)
+ BlockExist(*bc.Hash) bool
+
+ GetBlock(*bc.Hash) (*legacy.Block, error)
+ GetMainchain(*bc.Hash) (map[uint64]*bc.Hash, error)
+ GetSnapshot(*bc.Hash) (*state.Snapshot, error)
+ GetStoreStatus() txdb.BlockStoreStateJSON
SaveBlock(*legacy.Block) error
- FinalizeBlock(context.Context, uint64) error
- SaveSnapshot(context.Context, uint64, *state.Snapshot) error
+ SaveMainchain(map[uint64]*bc.Hash, *bc.Hash) error
+ SaveSnapshot(*state.Snapshot, *bc.Hash) error
+ SaveStoreStatus(uint64, *bc.Hash)
+}
+
+type OrphanManage struct {
+ //TODO: add orphan cached block limit
+ orphan map[bc.Hash]*legacy.Block
+ preOrphans map[bc.Hash][]*bc.Hash
+ mtx sync.RWMutex
+}
+
+func NewOrphanManage() *OrphanManage {
+ return &OrphanManage{
+ orphan: make(map[bc.Hash]*legacy.Block),
+ preOrphans: make(map[bc.Hash][]*bc.Hash),
+ }
+}
+
+func (o *OrphanManage) BlockExist(hash *bc.Hash) bool {
+ o.mtx.RLock()
+ _, ok := o.orphan[*hash]
+ o.mtx.RUnlock()
+ return ok
+}
+
+func (o *OrphanManage) Add(block *legacy.Block) {
+ blockHash := block.Hash()
+ o.mtx.Lock()
+ defer o.mtx.Unlock()
+
+ if _, ok := o.orphan[blockHash]; ok {
+ return
+ }
+
+ o.orphan[blockHash] = block
+ o.preOrphans[block.PreviousBlockHash] = append(o.preOrphans[block.PreviousBlockHash], &blockHash)
+}
+
+func (o *OrphanManage) Delete(hash *bc.Hash) {
+ o.mtx.Lock()
+ defer o.mtx.Unlock()
+ block, ok := o.orphan[*hash]
+ if !ok {
+ return
+ }
+ delete(o.orphan, *hash)
+
+ preOrphans, ok := o.preOrphans[block.PreviousBlockHash]
+ if !ok || len(preOrphans) == 1 {
+ delete(o.preOrphans, block.PreviousBlockHash)
+ return
+ }
+
+ for i, preOrphan := range preOrphans {
+ if preOrphan == hash {
+ o.preOrphans[block.PreviousBlockHash] = append(preOrphans[:i], preOrphans[i+1:]...)
+ return
+ }
+ }
+}
+
+func (o *OrphanManage) Get(hash *bc.Hash) (*legacy.Block, bool) {
+ o.mtx.RLock()
+ block, ok := o.orphan[*hash]
+ o.mtx.RUnlock()
+ return block, ok
}
// Chain provides a complete, minimal blockchain database. It
InitialBlockHash bc.Hash
MaxIssuanceWindow time.Duration // only used by generators
+ orphanManage *OrphanManage
+ txPool *TxPool
+
state struct {
- cond sync.Cond // protects height, block, snapshot
- height uint64
- block *legacy.Block
- snapshot *state.Snapshot
+ cond sync.Cond
+ block *legacy.Block
+ height uint64
+ hash *bc.Hash
+ mainChain map[uint64]*bc.Hash
+ snapshot *state.Snapshot
}
store Store
-
- lastQueuedSnapshot time.Time
- pendingSnapshots chan pendingSnapshot
-
- txPool *TxPool
-}
-
-type pendingSnapshot struct {
- height uint64
- snapshot *state.Snapshot
}
// NewChain returns a new Chain using store as the underlying storage.
-func NewChain(ctx context.Context, initialBlockHash bc.Hash, store Store, txPool *TxPool, heights <-chan uint64) (*Chain, error) {
+func NewChain(initialBlockHash bc.Hash, store Store, txPool *TxPool) (*Chain, error) {
c := &Chain{
InitialBlockHash: initialBlockHash,
+ orphanManage: NewOrphanManage(),
store: store,
- pendingSnapshots: make(chan pendingSnapshot, 1),
txPool: txPool,
}
c.state.cond.L = new(sync.Mutex)
+ storeStatus := store.GetStoreStatus()
+ c.state.height = storeStatus.Height
- log.WithField("current height", store.Height()).Info("Resume from the database")
- c.state.height = store.Height()
-
- if c.state.height < 1 {
+ if c.state.height == 0 {
c.state.snapshot = state.Empty()
- } else {
- c.state.block, _ = store.GetBlock(c.state.height)
- c.state.snapshot, _, _ = store.LatestSnapshot(ctx)
+ c.state.mainChain = make(map[uint64]*bc.Hash)
+ return c, nil
}
- // Note that c.height.n may still be zero here.
- if heights != nil {
- go func() {
- for h := range heights {
- c.setHeight(h)
- }
- }()
+ c.state.hash = storeStatus.Hash
+ var err error
+ if c.state.block, err = store.GetBlock(storeStatus.Hash); err != nil {
+ return nil, err
+ }
+ if c.state.snapshot, err = store.GetSnapshot(storeStatus.Hash); err != nil {
+ return nil, err
+ }
+ if c.state.mainChain, err = store.GetMainchain(storeStatus.Hash); err != nil {
+ return nil, err
}
-
- go func() {
- for {
- select {
- case <-ctx.Done():
- return
- case ps := <-c.pendingSnapshots:
- err := store.SaveSnapshot(ctx, ps.height, ps.snapshot)
- if err != nil {
- log.WithField("error", err).Error("Error occurs when saving snapshot")
- }
- }
- }
- }()
-
return c, nil
}
-func (c *Chain) GetStore() *Store {
- return &(c.store)
-}
-
// Height returns the current height of the blockchain.
func (c *Chain) Height() uint64 {
c.state.cond.L.Lock()
return c.state.height
}
+func (c *Chain) inMainchain(block *legacy.Block) bool {
+ hash, ok := c.state.mainChain[block.Height]
+ if !ok {
+ return false
+ }
+ return *hash == block.Hash()
+}
+
// TimestampMS returns the latest known block timestamp.
func (c *Chain) TimestampMS() uint64 {
c.state.cond.L.Lock()
return c.state.block, c.state.snapshot
}
-func (c *Chain) setState(b *legacy.Block, s *state.Snapshot) {
- c.state.cond.L.Lock()
- defer c.state.cond.L.Unlock()
- c.state.block = b
+// This function must be called with mu lock in above level
+func (c *Chain) setState(block *legacy.Block, s *state.Snapshot, m map[uint64]*bc.Hash) error {
+ if block.AssetsMerkleRoot != s.Tree.RootHash() {
+ return ErrBadStateRoot
+ }
+
+ blockHash := block.Hash()
+ c.state.block = block
+ c.state.height = block.Height
+ c.state.hash = &blockHash
c.state.snapshot = s
- if b != nil && b.Height > c.state.height {
- c.state.height = b.Height
- c.state.cond.Broadcast()
+ for k, v := range m {
+ c.state.mainChain[k] = v
+ }
+
+ if err := c.store.SaveSnapshot(c.state.snapshot, &blockHash); err != nil {
+ return err
+ }
+ if err := c.store.SaveMainchain(c.state.mainChain, &blockHash); err != nil {
+ return err
}
+ c.store.SaveStoreStatus(block.Height, &blockHash)
+
+ c.state.cond.Broadcast()
+ return nil
}
// BlockSoonWaiter returns a channel that
+++ /dev/null
-package prottest
-
-import (
- "sync"
- "testing"
-
- "github.com/bytom/crypto/ed25519"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/legacy"
- "github.com/bytom/protocol/state"
- "github.com/bytom/testutil"
-)
-
-var (
- mutex sync.Mutex // protects the following
- states = make(map[*protocol.Chain]*state.Snapshot)
- blockPubkeys = make(map[*protocol.Chain][]ed25519.PublicKey)
- blockPrivkeys = make(map[*protocol.Chain][]ed25519.PrivateKey)
-)
-
-type Option func(testing.TB, *config)
-
-func WithStore(store protocol.Store) Option {
- return func(_ testing.TB, conf *config) { conf.store = store }
-}
-
-func WithOutputIDs(outputIDs ...bc.Hash) Option {
- return func(_ testing.TB, conf *config) {
- for _, oid := range outputIDs {
- conf.initialState.Tree.Insert(oid.Bytes())
- }
- }
-}
-
-func WithBlockSigners(quorum, n int) Option {
- return func(tb testing.TB, conf *config) {
- conf.quorum = quorum
- for i := 0; i < n; i++ {
- pubkey, privkey, err := ed25519.GenerateKey(nil)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- conf.pubkeys = append(conf.pubkeys, pubkey)
- conf.privkeys = append(conf.privkeys, privkey)
- }
- }
-}
-
-type config struct {
- store protocol.Store
- initialState *state.Snapshot
- pubkeys []ed25519.PublicKey
- privkeys []ed25519.PrivateKey
- quorum int
-}
-
-// Initial returns the provided Chain's initial block.
-func Initial(tb testing.TB, c *protocol.Chain) *legacy.Block {
- b1, err := c.GetBlock(1)
- if err != nil {
- testutil.FatalErr(tb, err)
- }
- return b1
-}
-
-// BlockKeyPairs returns the configured block-signing key-pairs
-// for the provided Chain.
-func BlockKeyPairs(c *protocol.Chain) ([]ed25519.PublicKey, []ed25519.PrivateKey) {
- mutex.Lock()
- defer mutex.Unlock()
- return blockPubkeys[c], blockPrivkeys[c]
-}
+++ /dev/null
-package prottest
-
-/*func TestMakeBlock(t *testing.T) {
- c := NewChain(t)
- MakeBlock(t, c, nil)
- MakeBlock(t, c, nil)
- MakeBlock(t, c, nil)
-
- var want uint64 = 4
- if got := c.Height(); got != want {
- t.Errorf("c.Height() = %d want %d", got, want)
- }
-}*/
+++ /dev/null
-// Package prottest provides utilities for Chain Protocol testing.
-package prottest
+++ /dev/null
-// MemStore is a Store implementation that
-// keeps all blockchain state in memory.
-//
-// It is used in tests to avoid needing a database.
-package memstore
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/bytom/protocol/bc/legacy"
- "github.com/bytom/protocol/state"
-)
-
-// MemStore satisfies the Store interface.
-type MemStore struct {
- mu sync.Mutex
- Blocks map[uint64]*legacy.Block
- State *state.Snapshot
- StateHeight uint64
-}
-
-// New returns a new MemStore
-func New() *MemStore {
- return &MemStore{Blocks: make(map[uint64]*legacy.Block)}
-}
-
-func (m *MemStore) Height() uint64 {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- return uint64(len(m.Blocks))
-
-}
-
-func (m *MemStore) SaveBlock(b *legacy.Block) error {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- existing, ok := m.Blocks[b.Height]
- if ok && existing.Hash() != b.Hash() {
- return fmt.Errorf("already have a block at height %d", b.Height)
- }
- m.Blocks[b.Height] = b
- return nil
-}
-
-func (m *MemStore) SaveSnapshot(ctx context.Context, height uint64, snapshot *state.Snapshot) error {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- m.State = state.Copy(snapshot)
- m.StateHeight = height
- return nil
-}
-
-func (m *MemStore) GetBlock(height uint64) (*legacy.Block, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
- b, ok := m.Blocks[height]
- if !ok {
- return nil, fmt.Errorf("memstore: no block at height %d", height)
- }
- return b, nil
-}
-
-func (m *MemStore) LatestSnapshot(context.Context) (*state.Snapshot, uint64, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- if m.State == nil {
- m.State = state.Empty()
- }
- return state.Copy(m.State), m.StateHeight, nil
-}
-
-func (m *MemStore) FinalizeBlock(context.Context, uint64) error { return nil }
+++ /dev/null
-package protocol
-
-/*func TestRecoverSnapshotNoAdditionalBlocks(t *testing.T) {
- store := memstore.New()
- b, err := NewInitialBlock(time.Now().Add(-time.Minute))
- if err != nil {
- testutil.FatalErr(t, err)
- }
- c1, err := NewChain(context.Background(), b.Hash(), store, nil)
- if err != nil {
- t.Fatal(err)
- }
- err = c1.CommitAppliedBlock(context.Background(), b, state.Empty())
- if err != nil {
- testutil.FatalErr(t, err)
- }
-
- // Snapshots are applied asynchronously. This loops waits
- // until the snapshot is created.
- for {
- _, height, _ := store.LatestSnapshot(context.Background())
- if height > 0 {
- break
- }
- }
-
- ctx := context.Background()
-
- c2, err := NewChain(context.Background(), b.Hash(), store, nil)
- if err != nil {
- t.Fatal(err)
- }
- block, snapshot, err := c2.Recover(ctx)
- if err != nil {
- t.Fatal(err)
- }
- if block.Height != 1 {
- t.Fatalf("block.Height = %d, want %d", block.Height, 1)
- }
-
- err = c2.ValidateBlockForSig(ctx, createEmptyBlock(block, snapshot))
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func createEmptyBlock(block *legacy.Block, snapshot *state.Snapshot) *legacy.Block {
- root, err := bc.MerkleRoot(nil)
- if err != nil {
- log.Fatalf("calculating empty merkle root: %s", err)
- }
-
- return &legacy.Block{
- BlockHeader: legacy.BlockHeader{
- Version: 1,
- Height: block.Height + 1,
- PreviousBlockHash: block.Hash(),
- TimestampMS: bc.Millis(time.Now()),
- BlockCommitment: legacy.BlockCommitment{
- TransactionsMerkleRoot: root,
- AssetsMerkleRoot: snapshot.Tree.RootHash(),
- },
- },
- }
-}*/
}
return nil
}
+
+func (s *Snapshot) DetachBlock(block *bc.Block) error {
+ for i, tx := range block.Transactions {
+ err := s.DetachTx(tx)
+ if err != nil {
+ return errors.Wrapf(err, "detachTx block transaction %d", i)
+ }
+ }
+ return nil
+}
+
+func (s *Snapshot) DetachTx(tx *bc.Tx) error {
+ for _, n := range tx.NonceIDs {
+ delete(s.Nonces, n)
+ }
+
+ for _, prevout := range tx.SpentOutputIDs {
+ if s.Tree.Contains(prevout.Bytes()) {
+ return fmt.Errorf("invalid prevout %x", prevout.Bytes())
+ }
+ if err := s.Tree.Insert(prevout.Bytes()); err != nil {
+ return err
+ }
+ }
+
+ for _, id := range tx.TxHeader.ResultIds {
+ e := tx.Entries[*id]
+ if _, ok := e.(*bc.Output); !ok {
+ continue
+ }
+
+ s.Tree.Delete(id.Bytes())
+ }
+ return nil
+}
return c.txPool.GetErrCache(&newTx.ID)
}
- oldBlock, err := c.GetBlock(c.Height())
+ oldBlock, err := c.GetBlockByHash(c.state.hash)
if err != nil {
return err
}
BlockHeader: &bc.BlockHeader{
Height: 1,
},
- Transactions: []*bc.Tx{mockCoinbaseTx(624000000000)},
+ Transactions: []*bc.Tx{mockCoinbaseTx(1470000000000000000)},
},
err: nil,
},