package chainmgr
import (
- "container/list"
"time"
log "github.com/sirupsen/logrus"
)
const (
- syncCycle = 5 * time.Second
- blockProcessChSize = 1024
- blocksProcessChSize = 128
- headersProcessChSize = 1024
+ syncCycle = 5 * time.Second
+
+ noNeedSync = iota
+ fastSyncType
+ regularSyncType
)
var (
- maxBlockPerMsg = uint64(128)
- maxBlockHeadersPerMsg = uint64(2048)
- syncTimeout = 30 * time.Second
+ syncTimeout = 30 * time.Second
- errAppendHeaders = errors.New("fail to append list due to order dismatch")
errRequestTimeout = errors.New("request timeout")
errPeerDropped = errors.New("Peer dropped")
)
+type FastSync interface {
+ locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
+ locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error)
+ process() error
+ setSyncPeer(peer *peers.Peer)
+}
+
+type Fetcher interface {
+ processBlock(peerID string, block *types.Block)
+ processBlocks(peerID string, blocks []*types.Block)
+ processHeaders(peerID string, headers []*types.BlockHeader)
+ requireBlock(peerID string, height uint64) (*types.Block, error)
+}
+
type blockMsg struct {
block *types.Block
peerID string
}
type blockKeeper struct {
- chain Chain
- peers *peers.PeerSet
-
- syncPeer *peers.Peer
- blockProcessCh chan *blockMsg
- blocksProcessCh chan *blocksMsg
- headersProcessCh chan *headersMsg
+ chain Chain
+ fastSync FastSync
+ msgFetcher Fetcher
+ peers *peers.PeerSet
+ syncPeer *peers.Peer
- headerList *list.List
+ quit chan struct{}
}
func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
- bk := &blockKeeper{
- chain: chain,
- peers: peers,
- blockProcessCh: make(chan *blockMsg, blockProcessChSize),
- blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
- headersProcessCh: make(chan *headersMsg, headersProcessChSize),
- headerList: list.New(),
- }
- bk.resetHeaderState()
- go bk.syncWorker()
- return bk
-}
-
-func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
- for _, header := range headers {
- prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
- if prevHeader.Hash() != header.PreviousBlockHash {
- return errAppendHeaders
- }
- bk.headerList.PushBack(header)
- }
- return nil
-}
-
-func (bk *blockKeeper) blockLocator() []*bc.Hash {
- header := bk.chain.BestBlockHeader()
- locator := []*bc.Hash{}
-
- step := uint64(1)
- for header != nil {
- headerHash := header.Hash()
- locator = append(locator, &headerHash)
- if header.Height == 0 {
- break
- }
-
- var err error
- if header.Height < step {
- header, err = bk.chain.GetHeaderByHeight(0)
- } else {
- header, err = bk.chain.GetHeaderByHeight(header.Height - step)
- }
- if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
- break
- }
-
- if len(locator) >= 9 {
- step *= 2
- }
- }
- return locator
-}
-
-func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
- bk.resetHeaderState()
- lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
- for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
- if lastHeader.Height >= checkPoint.Height {
- return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
- }
-
- lastHash := lastHeader.Hash()
- headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
- if err != nil {
- return err
- }
-
- if len(headers) == 0 {
- return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
- }
-
- if err := bk.appendHeaderList(headers); err != nil {
- return err
- }
+ msgFetcher := newMsgFetcher(peers)
+ return &blockKeeper{
+ chain: chain,
+ fastSync: newFastSync(chain, msgFetcher, peers),
+ msgFetcher: msgFetcher,
+ peers: peers,
+ quit: make(chan struct{}),
}
-
- fastHeader := bk.headerList.Front()
- for bk.chain.BestBlockHeight() < checkPoint.Height {
- locator := bk.blockLocator()
- blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
- if err != nil {
- return err
- }
-
- if len(blocks) == 0 {
- return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
- }
-
- for _, block := range blocks {
- if fastHeader = fastHeader.Next(); fastHeader == nil {
- return errors.New("get block than is higher than checkpoint")
- }
-
- if _, err = bk.chain.ProcessBlock(block); err != nil {
- return errors.Wrap(err, "fail on fastBlockSync process block")
- }
- }
- }
- return nil
}
func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
- headers, err := bk.locateHeaders(locator, stopHash)
- if err != nil {
- return nil, err
- }
-
- blocks := []*types.Block{}
- for i, header := range headers {
- if uint64(i) >= maxBlockPerMsg {
- break
- }
-
- headerHash := header.Hash()
- block, err := bk.chain.GetBlockByHash(&headerHash)
- if err != nil {
- return nil, err
- }
-
- blocks = append(blocks, block)
- }
- return blocks, nil
+ return bk.fastSync.locateBlocks(locator, stopHash)
}
-func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
- stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
- if err != nil {
- return nil, err
- }
-
- startHeader, err := bk.chain.GetHeaderByHeight(0)
- if err != nil {
- return nil, err
- }
-
- for _, hash := range locator {
- header, err := bk.chain.GetHeaderByHash(hash)
- if err == nil && bk.chain.InMainChain(header.Hash()) {
- startHeader = header
- break
- }
- }
-
- totalHeaders := stopHeader.Height - startHeader.Height
- if totalHeaders > maxBlockHeadersPerMsg {
- totalHeaders = maxBlockHeadersPerMsg
- }
-
- headers := []*types.BlockHeader{}
- for i := uint64(1); i <= totalHeaders; i++ {
- header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
- if err != nil {
- return nil, err
- }
-
- headers = append(headers, header)
- }
- return headers, nil
-}
-
-func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
- height := bk.chain.BestBlockHeader().Height
- checkpoints := consensus.ActiveNetParams.Checkpoints
- if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
- return nil
- }
-
- nextCheckpoint := &checkpoints[len(checkpoints)-1]
- for i := len(checkpoints) - 2; i >= 0; i-- {
- if height >= checkpoints[i].Height {
- break
- }
- nextCheckpoint = &checkpoints[i]
- }
- return nextCheckpoint
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
+ return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum)
}
func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
- bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
+ bk.msgFetcher.processBlock(peerID, block)
}
func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
- bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+ bk.msgFetcher.processBlocks(peerID, blocks)
}
func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
- bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+ bk.msgFetcher.processHeaders(peerID, headers)
}
-func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
- i := bk.chain.BestBlockHeight() + 1
- for i <= wantHeight {
- block, err := bk.requireBlock(i)
+func (bk *blockKeeper) regularBlockSync() error {
+ peerHeight := bk.syncPeer.Height()
+ bestHeight := bk.chain.BestBlockHeight()
+ i := bestHeight + 1
+ for i <= peerHeight {
+ block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
if err != nil {
+ bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err)
return err
}
isOrphan, err := bk.chain.ProcessBlock(block)
if err != nil {
+ bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err)
return err
}
}
i = bk.chain.BestBlockHeight() + 1
}
+ log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
return nil
}
-func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
- if ok := bk.syncPeer.GetBlockByHeight(height); !ok {
- return nil, errPeerDropped
- }
-
- timeout := time.NewTimer(syncTimeout)
- defer timeout.Stop()
-
- for {
- select {
- case msg := <-bk.blockProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
- continue
- }
- if msg.block.Height != height {
- continue
- }
- return msg.block, nil
- case <-timeout.C:
- return nil, errors.Wrap(errRequestTimeout, "requireBlock")
- }
- }
+func (bk *blockKeeper) start() {
+ go bk.syncWorker()
}
-func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
- if ok := bk.syncPeer.GetBlocks(locator, stopHash); !ok {
- return nil, errPeerDropped
+func (bk *blockKeeper) checkSyncType() int {
+ peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
+ if peer == nil {
+ log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
+ return noNeedSync
}
- timeout := time.NewTimer(syncTimeout)
- defer timeout.Stop()
+ bestHeight := bk.chain.BestBlockHeight()
- for {
- select {
- case msg := <-bk.blocksProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
- continue
- }
- return msg.blocks, nil
- case <-timeout.C:
- return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
- }
+ if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
+ bk.fastSync.setSyncPeer(peer)
+ return fastSyncType
}
-}
-func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
- if ok := bk.syncPeer.GetHeaders(locator, stopHash); !ok {
- return nil, errPeerDropped
+ peer = bk.peers.BestPeer(consensus.SFFullNode)
+ if peer == nil {
+ log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
+ return noNeedSync
}
- timeout := time.NewTimer(syncTimeout)
- defer timeout.Stop()
-
- for {
- select {
- case msg := <-bk.headersProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
- continue
- }
- return msg.headers, nil
- case <-timeout.C:
- return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
- }
+ peerHeight := peer.Height()
+ if peerHeight > bestHeight {
+ bk.syncPeer = peer
+ return regularSyncType
}
-}
-// resetHeaderState sets the headers-first mode state to values appropriate for
-// syncing from a new peer.
-func (bk *blockKeeper) resetHeaderState() {
- header := bk.chain.BestBlockHeader()
- bk.headerList.Init()
- if bk.nextCheckpoint() != nil {
- bk.headerList.PushBack(header)
- }
+ return noNeedSync
}
func (bk *blockKeeper) startSync() bool {
- checkPoint := bk.nextCheckpoint()
- peer := bk.peers.BestPeer(consensus.SFFastSync | consensus.SFFullNode)
- if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
- bk.syncPeer = peer
- if err := bk.fastBlockSync(checkPoint); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
- bk.peers.ErrorHandler(peer.ID(), security.LevelMsgIllegal, err)
+ switch bk.checkSyncType() {
+ case fastSyncType:
+ if err := bk.fastSync.process(); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
return false
}
- return true
- }
-
- blockHeight := bk.chain.BestBlockHeight()
- peer = bk.peers.BestPeer(consensus.SFFullNode)
- if peer != nil && peer.Height() > blockHeight {
- bk.syncPeer = peer
- targetHeight := blockHeight + maxBlockPerMsg
- if targetHeight > peer.Height() {
- targetHeight = peer.Height()
- }
-
- if err := bk.regularBlockSync(targetHeight); err != nil {
+ case regularSyncType:
+ if err := bk.regularBlockSync(); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
- bk.peers.ErrorHandler(peer.ID(),security.LevelMsgIllegal, err)
return false
}
- return true
}
- return false
+
+ return true
+}
+
+func (bk *blockKeeper) stop() {
+ close(bk.quit)
}
func (bk *blockKeeper) syncWorker() {
defer syncTicker.Stop()
for {
- <-syncTicker.C
- if update := bk.startSync(); !update {
- continue
- }
-
- block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
- if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
- }
+ select {
+ case <-syncTicker.C:
+ if update := bk.startSync(); !update {
+ continue
+ }
- if err = bk.peers.BroadcastNewStatus(block); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
+ if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.BestIrreversibleHeader()); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
+ }
+ case <-bk.quit:
+ return
}
}
}
package chainmgr
import (
- "container/list"
"encoding/json"
"testing"
"time"
msgs "github.com/vapor/netsync/messages"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
- "github.com/vapor/test/mock"
"github.com/vapor/testutil"
)
-func TestAppendHeaderList(t *testing.T) {
- blocks := mockBlocks(nil, 7)
- cases := []struct {
- originalHeaders []*types.BlockHeader
- inputHeaders []*types.BlockHeader
- wantHeaders []*types.BlockHeader
- err error
- }{
- {
- originalHeaders: []*types.BlockHeader{&blocks[0].BlockHeader},
- inputHeaders: []*types.BlockHeader{&blocks[1].BlockHeader, &blocks[2].BlockHeader},
- wantHeaders: []*types.BlockHeader{&blocks[0].BlockHeader, &blocks[1].BlockHeader, &blocks[2].BlockHeader},
- err: nil,
- },
- {
- originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader},
- inputHeaders: []*types.BlockHeader{&blocks[6].BlockHeader},
- wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader, &blocks[6].BlockHeader},
- err: nil,
- },
- {
- originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader},
- inputHeaders: []*types.BlockHeader{&blocks[7].BlockHeader},
- wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader},
- err: errAppendHeaders,
- },
- {
- originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader},
- inputHeaders: []*types.BlockHeader{&blocks[7].BlockHeader, &blocks[6].BlockHeader},
- wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader},
- err: errAppendHeaders,
- },
- {
- originalHeaders: []*types.BlockHeader{&blocks[2].BlockHeader},
- inputHeaders: []*types.BlockHeader{&blocks[3].BlockHeader, &blocks[4].BlockHeader, &blocks[6].BlockHeader},
- wantHeaders: []*types.BlockHeader{&blocks[2].BlockHeader, &blocks[3].BlockHeader, &blocks[4].BlockHeader},
- err: errAppendHeaders,
- },
- }
-
- for i, c := range cases {
- bk := &blockKeeper{headerList: list.New()}
- for _, header := range c.originalHeaders {
- bk.headerList.PushBack(header)
- }
-
- if err := bk.appendHeaderList(c.inputHeaders); err != c.err {
- t.Errorf("case %d: got error %v want error %v", i, err, c.err)
- }
-
- gotHeaders := []*types.BlockHeader{}
- for e := bk.headerList.Front(); e != nil; e = e.Next() {
- gotHeaders = append(gotHeaders, e.Value.(*types.BlockHeader))
- }
-
- if !testutil.DeepEqual(gotHeaders, c.wantHeaders) {
- t.Errorf("case %d: got %v want %v", i, gotHeaders, c.wantHeaders)
- }
- }
-}
-
-func TestBlockLocator(t *testing.T) {
- blocks := mockBlocks(nil, 500)
- cases := []struct {
- bestHeight uint64
- wantHeight []uint64
- }{
- {
- bestHeight: 0,
- wantHeight: []uint64{0},
- },
- {
- bestHeight: 1,
- wantHeight: []uint64{1, 0},
- },
- {
- bestHeight: 7,
- wantHeight: []uint64{7, 6, 5, 4, 3, 2, 1, 0},
- },
- {
- bestHeight: 10,
- wantHeight: []uint64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0},
- },
- {
- bestHeight: 100,
- wantHeight: []uint64{100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 89, 85, 77, 61, 29, 0},
- },
- {
- bestHeight: 500,
- wantHeight: []uint64{500, 499, 498, 497, 496, 495, 494, 493, 492, 491, 489, 485, 477, 461, 429, 365, 237, 0},
- },
- }
-
- for i, c := range cases {
- mockChain := mock.NewChain(nil)
- bk := &blockKeeper{chain: mockChain}
- mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader)
- for i := uint64(0); i <= c.bestHeight; i++ {
- mockChain.SetBlockByHeight(i, blocks[i])
- }
-
- want := []*bc.Hash{}
- for _, i := range c.wantHeight {
- hash := blocks[i].Hash()
- want = append(want, &hash)
- }
-
- if got := bk.blockLocator(); !testutil.DeepEqual(got, want) {
- t.Errorf("case %d: got %v want %v", i, got, want)
- }
- }
-}
-
-func TestFastBlockSync(t *testing.T) {
- maxBlockPerMsg = 5
- maxBlockHeadersPerMsg = 10
- baseChain := mockBlocks(nil, 300)
-
- cases := []struct {
- syncTimeout time.Duration
- aBlocks []*types.Block
- bBlocks []*types.Block
- checkPoint *consensus.Checkpoint
- want []*types.Block
- err error
- }{
- {
- syncTimeout: 30 * time.Second,
- aBlocks: baseChain[:100],
- bBlocks: baseChain[:301],
- checkPoint: &consensus.Checkpoint{
- Height: baseChain[250].Height,
- Hash: baseChain[250].Hash(),
- },
- want: baseChain[:251],
- err: nil,
- },
- {
- syncTimeout: 30 * time.Second,
- aBlocks: baseChain[:100],
- bBlocks: baseChain[:301],
- checkPoint: &consensus.Checkpoint{
- Height: baseChain[100].Height,
- Hash: baseChain[100].Hash(),
- },
- want: baseChain[:101],
- err: nil,
- },
- {
- syncTimeout: 1 * time.Millisecond,
- aBlocks: baseChain[:100],
- bBlocks: baseChain[:100],
- checkPoint: &consensus.Checkpoint{
- Height: baseChain[200].Height,
- Hash: baseChain[200].Hash(),
- },
- want: baseChain[:100],
- err: errRequestTimeout,
- },
- }
-
- for i, c := range cases {
- syncTimeout = c.syncTimeout
- a := mockSync(c.aBlocks, nil)
- b := mockSync(c.bBlocks, nil)
- netWork := NewNetWork()
- netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
- netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
- if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
- t.Errorf("fail on peer hands shake %v", err)
- } else {
- go B2A.postMan()
- go A2B.postMan()
- }
-
- a.blockKeeper.syncPeer = a.peers.GetPeer("test node B")
- if err := a.blockKeeper.fastBlockSync(c.checkPoint); errors.Root(err) != c.err {
- t.Errorf("case %d: got %v want %v", i, err, c.err)
- }
-
- got := []*types.Block{}
- for i := uint64(0); i <= a.chain.BestBlockHeight(); i++ {
- block, err := a.chain.GetBlockByHeight(i)
- if err != nil {
- t.Errorf("case %d got err %v", i, err)
- }
- got = append(got, block)
- }
-
- if !testutil.DeepEqual(got, c.want) {
- t.Errorf("case %d: got %v want %v", i, got, c.want)
- }
- }
-}
-
-func TestLocateBlocks(t *testing.T) {
- maxBlockPerMsg = 5
- blocks := mockBlocks(nil, 100)
- cases := []struct {
- locator []uint64
- stopHash bc.Hash
- wantHeight []uint64
- }{
- {
- locator: []uint64{20},
- stopHash: blocks[100].Hash(),
- wantHeight: []uint64{21, 22, 23, 24, 25},
- },
- }
-
- mockChain := mock.NewChain(nil)
- bk := &blockKeeper{chain: mockChain}
- for _, block := range blocks {
- mockChain.SetBlockByHeight(block.Height, block)
- }
-
- for i, c := range cases {
- locator := []*bc.Hash{}
- for _, i := range c.locator {
- hash := blocks[i].Hash()
- locator = append(locator, &hash)
- }
-
- want := []*types.Block{}
- for _, i := range c.wantHeight {
- want = append(want, blocks[i])
- }
-
- got, _ := bk.locateBlocks(locator, &c.stopHash)
- if !testutil.DeepEqual(got, want) {
- t.Errorf("case %d: got %v want %v", i, got, want)
- }
- }
-}
-
-func TestLocateHeaders(t *testing.T) {
- maxBlockHeadersPerMsg = 10
- blocks := mockBlocks(nil, 150)
- cases := []struct {
- chainHeight uint64
- locator []uint64
- stopHash bc.Hash
- wantHeight []uint64
- err bool
- }{
- {
- chainHeight: 100,
- locator: []uint64{},
- stopHash: blocks[100].Hash(),
- wantHeight: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
- err: false,
- },
- {
- chainHeight: 100,
- locator: []uint64{20},
- stopHash: blocks[100].Hash(),
- wantHeight: []uint64{21, 22, 23, 24, 25, 26, 27, 28, 29, 30},
- err: false,
- },
- {
- chainHeight: 100,
- locator: []uint64{20},
- stopHash: blocks[24].Hash(),
- wantHeight: []uint64{21, 22, 23, 24},
- err: false,
- },
- {
- chainHeight: 100,
- locator: []uint64{20},
- stopHash: blocks[20].Hash(),
- wantHeight: []uint64{},
- err: false,
- },
- {
- chainHeight: 100,
- locator: []uint64{20},
- stopHash: bc.Hash{},
- wantHeight: []uint64{},
- err: true,
- },
- {
- chainHeight: 100,
- locator: []uint64{120, 70},
- stopHash: blocks[78].Hash(),
- wantHeight: []uint64{71, 72, 73, 74, 75, 76, 77, 78},
- err: false,
- },
- }
-
- for i, c := range cases {
- mockChain := mock.NewChain(nil)
- bk := &blockKeeper{chain: mockChain}
- for i := uint64(0); i <= c.chainHeight; i++ {
- mockChain.SetBlockByHeight(i, blocks[i])
- }
-
- locator := []*bc.Hash{}
- for _, i := range c.locator {
- hash := blocks[i].Hash()
- locator = append(locator, &hash)
- }
-
- want := []*types.BlockHeader{}
- for _, i := range c.wantHeight {
- want = append(want, &blocks[i].BlockHeader)
- }
-
- got, err := bk.locateHeaders(locator, &c.stopHash)
- if err != nil != c.err {
- t.Errorf("case %d: got %v want err = %v", i, err, c.err)
- }
- if !testutil.DeepEqual(got, want) {
- t.Errorf("case %d: got %v want %v", i, got, want)
- }
- }
-}
-
-func TestNextCheckpoint(t *testing.T) {
- cases := []struct {
- checkPoints []consensus.Checkpoint
- bestHeight uint64
- want *consensus.Checkpoint
- }{
- {
- checkPoints: []consensus.Checkpoint{},
- bestHeight: 5000,
- want: nil,
- },
- {
- checkPoints: []consensus.Checkpoint{
- {Height: 10000, Hash: bc.Hash{V0: 1}},
- },
- bestHeight: 5000,
- want: &consensus.Checkpoint{Height: 10000, Hash: bc.Hash{V0: 1}},
- },
- {
- checkPoints: []consensus.Checkpoint{
- {Height: 10000, Hash: bc.Hash{V0: 1}},
- {Height: 20000, Hash: bc.Hash{V0: 2}},
- {Height: 30000, Hash: bc.Hash{V0: 3}},
- },
- bestHeight: 15000,
- want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
- },
- {
- checkPoints: []consensus.Checkpoint{
- {Height: 10000, Hash: bc.Hash{V0: 1}},
- {Height: 20000, Hash: bc.Hash{V0: 2}},
- {Height: 30000, Hash: bc.Hash{V0: 3}},
- },
- bestHeight: 10000,
- want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
- },
- {
- checkPoints: []consensus.Checkpoint{
- {Height: 10000, Hash: bc.Hash{V0: 1}},
- {Height: 20000, Hash: bc.Hash{V0: 2}},
- {Height: 30000, Hash: bc.Hash{V0: 3}},
- },
- bestHeight: 35000,
- want: nil,
- },
- }
-
- mockChain := mock.NewChain(nil)
- for i, c := range cases {
- consensus.ActiveNetParams.Checkpoints = c.checkPoints
- mockChain.SetBestBlockHeader(&types.BlockHeader{Height: c.bestHeight})
- bk := &blockKeeper{chain: mockChain}
-
- if got := bk.nextCheckpoint(); !testutil.DeepEqual(got, c.want) {
- t.Errorf("case %d: got %v want %v", i, got, c.want)
- }
- }
-}
-
func TestRegularBlockSync(t *testing.T) {
baseChain := mockBlocks(nil, 50)
chainX := append(baseChain, mockBlocks(baseChain[50], 60)...)
chainY := append(baseChain, mockBlocks(baseChain[50], 70)...)
+ chainZ := append(baseChain, mockBlocks(baseChain[50], 200)...)
+
cases := []struct {
syncTimeout time.Duration
aBlocks []*types.Block
bBlocks []*types.Block
- syncHeight uint64
want []*types.Block
err error
}{
syncTimeout: 30 * time.Second,
aBlocks: baseChain[:20],
bBlocks: baseChain[:50],
- syncHeight: 45,
- want: baseChain[:46],
+ want: baseChain[:50],
err: nil,
},
{
syncTimeout: 30 * time.Second,
aBlocks: chainX,
bBlocks: chainY,
- syncHeight: 70,
want: chainY,
err: nil,
},
syncTimeout: 30 * time.Second,
aBlocks: chainX[:52],
bBlocks: chainY[:53],
- syncHeight: 52,
want: chainY[:53],
err: nil,
},
{
- syncTimeout: 1 * time.Millisecond,
- aBlocks: baseChain,
- bBlocks: baseChain,
- syncHeight: 52,
- want: baseChain,
- err: errRequestTimeout,
+ syncTimeout: 30 * time.Second,
+ aBlocks: chainX[:52],
+ bBlocks: chainZ,
+ want: chainZ[:201],
+ err: nil,
},
}
}
a.blockKeeper.syncPeer = a.peers.GetPeer("test node B")
- if err := a.blockKeeper.regularBlockSync(c.syncHeight); errors.Root(err) != c.err {
+ if err := a.blockKeeper.regularBlockSync(); errors.Root(err) != c.err {
t.Errorf("case %d: got %v want %v", i, err, c.err)
}
for i, c := range cases {
syncTimeout = c.syncTimeout
- got, err := c.testNode.blockKeeper.requireBlock(c.requireHeight)
+ got, err := c.testNode.blockKeeper.msgFetcher.requireBlock(c.testNode.blockKeeper.syncPeer.ID(), c.requireHeight)
if !testutil.DeepEqual(got, c.want) {
t.Errorf("case %d: got %v want %v", i, got, c.want)
}
--- /dev/null
+package chainmgr
+
+import (
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/errors"
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p/security"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+var (
+ maxBlocksPerMsg = uint64(1000)
+ maxHeadersPerMsg = uint64(1000)
+ fastSyncPivotGap = uint64(64)
+ minGapStartFastSync = uint64(128)
+ maxFastSyncBlocksNum = uint64(10000)
+
+ errOrphanBlock = errors.New("fast sync block is orphan")
+)
+
+type MsgFetcher interface {
+ requireBlock(peerID string, height uint64) (*types.Block, error)
+ requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
+}
+
+type fastSync struct {
+ chain Chain
+ msgFetcher MsgFetcher
+ peers *peers.PeerSet
+ syncPeer *peers.Peer
+ stopHeader *types.BlockHeader
+ length uint64
+
+ quite chan struct{}
+}
+
+func newFastSync(chain Chain, msgFether MsgFetcher, peers *peers.PeerSet) *fastSync {
+ return &fastSync{
+ chain: chain,
+ msgFetcher: msgFether,
+ peers: peers,
+ quite: make(chan struct{}),
+ }
+}
+
+func (fs *fastSync) blockLocator() []*bc.Hash {
+ header := fs.chain.BestBlockHeader()
+ locator := []*bc.Hash{}
+
+ step := uint64(1)
+ for header != nil {
+ headerHash := header.Hash()
+ locator = append(locator, &headerHash)
+ if header.Height == 0 {
+ break
+ }
+
+ var err error
+ if header.Height < step {
+ header, err = fs.chain.GetHeaderByHeight(0)
+ } else {
+ header, err = fs.chain.GetHeaderByHeight(header.Height - step)
+ }
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
+ break
+ }
+
+ if len(locator) >= 9 {
+ step *= 2
+ }
+ }
+ return locator
+}
+
+func (fs *fastSync) process() error {
+ if err := fs.findFastSyncRange(); err != nil {
+ return err
+ }
+
+ stopHash := fs.stopHeader.Hash()
+ for fs.chain.BestBlockHeight() < fs.stopHeader.Height {
+ blocks, err := fs.msgFetcher.requireBlocks(fs.syncPeer.ID(), fs.blockLocator(), &stopHash)
+ if err != nil {
+ fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelConnException, err)
+ return err
+ }
+
+ if err := fs.verifyBlocks(blocks); err != nil {
+ fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelMsgIllegal, err)
+ return err
+ }
+ }
+
+ log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync success")
+ return nil
+}
+
+func (fs *fastSync) findFastSyncRange() error {
+ bestHeight := fs.chain.BestBlockHeight()
+ fs.length = fs.syncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight
+ if fs.length > maxFastSyncBlocksNum {
+ fs.length = maxFastSyncBlocksNum
+ }
+
+ stopBlock, err := fs.msgFetcher.requireBlock(fs.syncPeer.ID(), bestHeight+fs.length)
+ if err != nil {
+ return err
+ }
+
+ fs.stopHeader = &stopBlock.BlockHeader
+ return nil
+}
+
+func (fs *fastSync) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ headers, err := fs.locateHeaders(locator, stopHash, 0, maxBlocksPerMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks := []*types.Block{}
+ for _, header := range headers {
+ headerHash := header.Hash()
+ block, err := fs.chain.GetBlockByHash(&headerHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks = append(blocks, block)
+ }
+ return blocks, nil
+}
+
+func (fs *fastSync) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
+ startHeader, err := fs.chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, hash := range locator {
+ header, err := fs.chain.GetHeaderByHash(hash)
+ if err == nil && fs.chain.InMainChain(header.Hash()) {
+ startHeader = header
+ break
+ }
+ }
+
+ headers := make([]*types.BlockHeader, 0)
+ stopHeader, err := fs.chain.GetHeaderByHash(stopHash)
+ if err != nil {
+ return headers, nil
+ }
+
+ if !fs.chain.InMainChain(*stopHash) {
+ return headers, nil
+ }
+
+ num := uint64(0)
+ for i := startHeader.Height; i <= stopHeader.Height && num < maxNum; i += skip + 1 {
+ header, err := fs.chain.GetHeaderByHeight(i)
+ if err != nil {
+ return nil, err
+ }
+
+ headers = append(headers, header)
+ num++
+ }
+
+ return headers, nil
+}
+
+func (fs *fastSync) setSyncPeer(peer *peers.Peer) {
+ fs.syncPeer = peer
+}
+
+func (fs *fastSync) verifyBlocks(blocks []*types.Block) error {
+ for _, block := range blocks {
+ isOrphan, err := fs.chain.ProcessBlock(block)
+ if err != nil {
+ return err
+ }
+
+ if isOrphan {
+ log.WithFields(log.Fields{"module": logModule, "height": block.Height, "hash": block.Hash()}).Error("fast sync block is orphan")
+ return errOrphanBlock
+ }
+ }
+
+ return nil
+}
--- /dev/null
+package chainmgr
+
+import (
+ "testing"
+ "time"
+
+ "github.com/vapor/consensus"
+ "github.com/vapor/errors"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+ "github.com/vapor/test/mock"
+ "github.com/vapor/testutil"
+)
+
+func TestBlockLocator(t *testing.T) {
+ blocks := mockBlocks(nil, 500)
+ cases := []struct {
+ bestHeight uint64
+ wantHeight []uint64
+ }{
+ {
+ bestHeight: 0,
+ wantHeight: []uint64{0},
+ },
+ {
+ bestHeight: 1,
+ wantHeight: []uint64{1, 0},
+ },
+ {
+ bestHeight: 7,
+ wantHeight: []uint64{7, 6, 5, 4, 3, 2, 1, 0},
+ },
+ {
+ bestHeight: 10,
+ wantHeight: []uint64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0},
+ },
+ {
+ bestHeight: 100,
+ wantHeight: []uint64{100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 89, 85, 77, 61, 29, 0},
+ },
+ {
+ bestHeight: 500,
+ wantHeight: []uint64{500, 499, 498, 497, 496, 495, 494, 493, 492, 491, 489, 485, 477, 461, 429, 365, 237, 0},
+ },
+ }
+
+ for i, c := range cases {
+ mockChain := mock.NewChain(nil)
+ fs := &fastSync{chain: mockChain}
+ mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader)
+ for i := uint64(0); i <= c.bestHeight; i++ {
+ mockChain.SetBlockByHeight(i, blocks[i])
+ }
+
+ want := []*bc.Hash{}
+ for _, i := range c.wantHeight {
+ hash := blocks[i].Hash()
+ want = append(want, &hash)
+ }
+
+ if got := fs.blockLocator(); !testutil.DeepEqual(got, want) {
+ t.Errorf("case %d: got %v want %v", i, got, want)
+ }
+ }
+}
+
+func TestFastBlockSync(t *testing.T) {
+ maxBlocksPerMsg = 10
+ maxHeadersPerMsg = 10
+ maxFastSyncBlocksNum = 200
+ baseChain := mockBlocks(nil, 300)
+
+ cases := []struct {
+ syncTimeout time.Duration
+ aBlocks []*types.Block
+ bBlocks []*types.Block
+ want []*types.Block
+ err error
+ }{
+ {
+ syncTimeout: 30 * time.Second,
+ aBlocks: baseChain[:50],
+ bBlocks: baseChain[:301],
+ want: baseChain[:237],
+ err: nil,
+ },
+ {
+ syncTimeout: 30 * time.Second,
+ aBlocks: baseChain[:2],
+ bBlocks: baseChain[:300],
+ want: baseChain[:202],
+ err: nil,
+ },
+ }
+
+ for i, c := range cases {
+ syncTimeout = c.syncTimeout
+ a := mockSync(c.aBlocks, nil)
+ b := mockSync(c.bBlocks, nil)
+ netWork := NewNetWork()
+ netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode|consensus.SFFastSync)
+ netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode|consensus.SFFastSync)
+ if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
+ t.Errorf("fail on peer hands shake %v", err)
+ } else {
+ go B2A.postMan()
+ go A2B.postMan()
+ }
+ a.blockKeeper.syncPeer = a.peers.GetPeer("test node B")
+ a.blockKeeper.fastSync.setSyncPeer(a.blockKeeper.syncPeer)
+
+ if err := a.blockKeeper.fastSync.process(); errors.Root(err) != c.err {
+ t.Errorf("case %d: got %v want %v", i, err, c.err)
+ }
+
+ got := []*types.Block{}
+ for i := uint64(0); i <= a.chain.BestBlockHeight(); i++ {
+ block, err := a.chain.GetBlockByHeight(i)
+ if err != nil {
+ t.Errorf("case %d got err %v", i, err)
+ }
+ got = append(got, block)
+ }
+ if !testutil.DeepEqual(got, c.want) {
+ t.Errorf("case %d: got %v want %v", i, got, c.want)
+ }
+ }
+}
+
+func TestLocateBlocks(t *testing.T) {
+ maxBlocksPerMsg = 5
+ blocks := mockBlocks(nil, 100)
+ cases := []struct {
+ locator []uint64
+ stopHash bc.Hash
+ wantHeight []uint64
+ }{
+ {
+ locator: []uint64{20},
+ stopHash: blocks[100].Hash(),
+ wantHeight: []uint64{20, 21, 22, 23, 24},
+ },
+ }
+
+ mockChain := mock.NewChain(nil)
+ fs := &fastSync{chain: mockChain}
+ for _, block := range blocks {
+ mockChain.SetBlockByHeight(block.Height, block)
+ }
+
+ for i, c := range cases {
+ locator := []*bc.Hash{}
+ for _, i := range c.locator {
+ hash := blocks[i].Hash()
+ locator = append(locator, &hash)
+ }
+
+ want := []*types.Block{}
+ for _, i := range c.wantHeight {
+ want = append(want, blocks[i])
+ }
+
+ got, _ := fs.locateBlocks(locator, &c.stopHash)
+ if !testutil.DeepEqual(got, want) {
+ t.Errorf("case %d: got %v want %v", i, got, want)
+ }
+ }
+}
+
+func TestLocateHeaders(t *testing.T) {
+ maxHeadersPerMsg = 10
+ blocks := mockBlocks(nil, 150)
+ blocksHash := []bc.Hash{}
+ for _, block := range blocks {
+ blocksHash = append(blocksHash, block.Hash())
+ }
+
+ cases := []struct {
+ chainHeight uint64
+ locator []uint64
+ stopHash *bc.Hash
+ skip uint64
+ wantHeight []uint64
+ err bool
+ }{
+ {
+ chainHeight: 100,
+ locator: []uint64{90},
+ stopHash: &blocksHash[100],
+ skip: 0,
+ wantHeight: []uint64{90, 91, 92, 93, 94, 95, 96, 97, 98, 99},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{20},
+ stopHash: &blocksHash[24],
+ skip: 0,
+ wantHeight: []uint64{20, 21, 22, 23, 24},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{20},
+ stopHash: &blocksHash[20],
+ wantHeight: []uint64{20},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{20},
+ stopHash: &blocksHash[120],
+ wantHeight: []uint64{},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{120, 70},
+ stopHash: &blocksHash[78],
+ wantHeight: []uint64{70, 71, 72, 73, 74, 75, 76, 77, 78},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{15},
+ stopHash: &blocksHash[10],
+ skip: 10,
+ wantHeight: []uint64{},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{15},
+ stopHash: &blocksHash[80],
+ skip: 10,
+ wantHeight: []uint64{15, 26, 37, 48, 59, 70},
+ err: false,
+ },
+ {
+ chainHeight: 100,
+ locator: []uint64{0},
+ stopHash: &blocksHash[100],
+ skip: 9,
+ wantHeight: []uint64{0, 10, 20, 30, 40, 50, 60, 70, 80, 90},
+ err: false,
+ },
+ }
+
+ for i, c := range cases {
+ mockChain := mock.NewChain(nil)
+ fs := &fastSync{chain: mockChain}
+ for i := uint64(0); i <= c.chainHeight; i++ {
+ mockChain.SetBlockByHeight(i, blocks[i])
+ }
+
+ locator := []*bc.Hash{}
+ for _, i := range c.locator {
+ hash := blocks[i].Hash()
+ locator = append(locator, &hash)
+ }
+
+ want := []*types.BlockHeader{}
+ for _, i := range c.wantHeight {
+ want = append(want, &blocks[i].BlockHeader)
+ }
+
+ got, err := fs.locateHeaders(locator, c.stopHash, c.skip, maxHeadersPerMsg)
+ if err != nil != c.err {
+ t.Errorf("case %d: got %v want err = %v", i, err, c.err)
+ }
+ if !testutil.DeepEqual(got, want) {
+ t.Errorf("case %d: got %v want %v", i, got, want)
+ }
+ }
+}
// Chain is the interface for Bytom core
type Chain interface {
BestBlockHeader() *types.BlockHeader
+ BestIrreversibleHeader() *types.BlockHeader
BestBlockHeight() uint64
GetBlockByHash(*bc.Hash) (*types.Block, error)
GetBlockByHeight(uint64) (*types.Block, error)
rawData, err := block.MarshalText()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
- continue
+ return
}
if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
}
func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
- headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+ headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxHeadersPerMsg)
if err != nil || len(headers) == 0 {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
return
func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
- peer.SetStatus(msg.Height, msg.GetHash())
- return
+ peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
+ peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
}
}
return errors.New("invalid peer")
}
- if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+ if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.BestIrreversibleHeader()); err != nil {
m.peers.RemovePeer(p.ID())
return err
}
if err != nil {
return err
}
-
+ m.blockKeeper.start()
go m.broadcastTxsLoop()
go m.syncMempoolLoop()
//Stop stop sync manager
func (m *Manager) Stop() {
+ m.blockKeeper.stop()
close(m.quit)
}
--- /dev/null
+package chainmgr
+
+import (
+ "time"
+
+ "github.com/vapor/errors"
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+const (
+ blockProcessChSize = 1024
+ blocksProcessChSize = 128
+ headersProcessChSize = 1024
+)
+
+type msgFetcher struct {
+ peers *peers.PeerSet
+
+ blockProcessCh chan *blockMsg
+ blocksProcessCh chan *blocksMsg
+ headersProcessCh chan *headersMsg
+}
+
+func newMsgFetcher(peers *peers.PeerSet) *msgFetcher {
+ return &msgFetcher{
+ peers: peers,
+ blockProcessCh: make(chan *blockMsg, blockProcessChSize),
+ blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
+ headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+ }
+}
+
+func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
+ mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
+}
+
+func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
+ mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
+ mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
+ peer := mf.peers.GetPeer(peerID)
+ if peer == nil {
+ return nil, errPeerDropped
+ }
+
+ if ok := peer.GetBlockByHeight(height); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case msg := <-mf.blockProcessCh:
+ if msg.peerID != peerID {
+ continue
+ }
+ if msg.block.Height != height {
+ continue
+ }
+ return msg.block, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+ }
+ }
+}
+
+func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ peer := mf.peers.GetPeer(peerID)
+ if peer == nil {
+ return nil, errPeerDropped
+ }
+
+ if ok := peer.GetBlocks(locator, stopHash); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case msg := <-mf.blocksProcessCh:
+ if msg.peerID != peerID {
+ continue
+ }
+
+ return msg.blocks, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
+ }
+ }
+}
+
+func (mf *msgFetcher) requireHeaders(peerID string, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) ([]*types.BlockHeader, error) {
+ peer := mf.peers.GetPeer(peerID)
+ if peer == nil {
+ return nil, errPeerDropped
+ }
+
+ if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
+ return nil, errPeerDropped
+ }
+
+ timeout := time.NewTimer(syncTimeout)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case msg := <-mf.headersProcessCh:
+ if msg.peerID != peerID {
+ continue
+ }
+
+ return msg.headers, nil
+ case <-timeout.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+ }
+ }
+}
if err := pr.manager.SendStatus(peer); err != nil {
return err
}
+
pr.manager.syncMempool(peer.Key)
return nil
}
"math/rand"
"net"
- wire "github.com/tendermint/go-wire"
+ "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/flowrate"
"github.com/vapor/consensus"
nodeA.AddPeer(&A2B)
nodeB.AddPeer(&B2A)
-
+ nodeA.SendStatus(B2A.srcPeer)
+ nodeB.SendStatus(A2B.srcPeer)
A2B.setAsync(true)
B2A.setAsync(true)
return &B2A, &A2B, nil
type GetHeadersMessage struct {
RawBlockLocator [][32]byte
RawStopHash [32]byte
+ Skip uint64
}
//NewGetHeadersMessage return a new GetHeadersMessage
-func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetHeadersMessage {
+func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash, skip uint64) *GetHeadersMessage {
msg := &GetHeadersMessage{
RawStopHash: stopHash.Byte32(),
+ Skip: skip,
}
for _, hash := range blockLocator {
msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32())
}
func (m *GetHeadersMessage) String() string {
- return fmt.Sprintf("{stop_hash: %s}", hex.EncodeToString(m.RawStopHash[:]))
+ stopHash := bc.NewHash(m.RawStopHash)
+ return fmt.Sprintf("{skip:%d,stopHash:%s}", m.Skip, stopHash.String())
}
//GetStopHash return the stop hash of the msg
return &hash
}
+func (m *GetHeadersMessage) GetSkip() uint64 {
+ return m.Skip
+}
+
//HeadersMessage is one of the bytom msg type
type HeadersMessage struct {
RawHeaders [][]byte
//StatusResponseMessage get status response msg
type StatusMessage struct {
- Height uint64
- RawHash [32]byte
+ BestHeight uint64
+ BestHash [32]byte
+ IrreversibleHeight uint64
+ IrreversibleHash [32]byte
}
//NewStatusResponseMessage construct get status response msg
-func NewStatusMessage(blockHeader *types.BlockHeader) *StatusMessage {
+func NewStatusMessage(bestHeader, irreversibleHeader *types.BlockHeader) *StatusMessage {
return &StatusMessage{
- Height: blockHeader.Height,
- RawHash: blockHeader.Hash().Byte32(),
+ BestHeight: bestHeader.Height,
+ BestHash: bestHeader.Hash().Byte32(),
+ IrreversibleHeight: irreversibleHeader.Height,
+ IrreversibleHash: irreversibleHeader.Hash().Byte32(),
}
}
//GetHash get hash from msg
-func (m *StatusMessage) GetHash() *bc.Hash {
- hash := bc.NewHash(m.RawHash)
+func (m *StatusMessage) GetBestHash() *bc.Hash {
+ hash := bc.NewHash(m.BestHash)
+ return &hash
+}
+
+func (m *StatusMessage) GetIrreversibleHash() *bc.Hash {
+ hash := bc.NewHash(m.IrreversibleHash)
return &hash
}
func (m *StatusMessage) String() string {
- return fmt.Sprintf("{height: %d, hash: %s}", m.Height, hex.EncodeToString(m.RawHash[:]))
+ return fmt.Sprintf("{best hash: %s, irreversible hash: %s}", hex.EncodeToString(m.BestHash[:]), hex.EncodeToString(m.IrreversibleHash[:]))
}
//TransactionMessage notify new tx msg
type testGetHeadersMessage struct {
blockLocator []*bc.Hash
stopHash *bc.Hash
+ skip uint64
}
func TestGetHeadersMessage(t *testing.T) {
testMsg := testGetHeadersMessage{
blockLocator: []*bc.Hash{{V0: 0x01}, {V0: 0x02}, {V0: 0x03}},
- stopHash: &bc.Hash{V0: 0xaa, V2: 0x55},
+ stopHash: &bc.Hash{V0: 0x01},
+ skip: 888,
}
- getHeadersMsg := NewGetHeadersMessage(testMsg.blockLocator, testMsg.stopHash)
+ getHeadersMsg := NewGetHeadersMessage(testMsg.blockLocator, testMsg.stopHash, testMsg.skip)
gotBlockLocator := getHeadersMsg.GetBlockLocator()
gotStopHash := getHeadersMsg.GetStopHash()
+ gotSkip := getHeadersMsg.GetSkip()
if !reflect.DeepEqual(testMsg.blockLocator, gotBlockLocator) {
t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotBlockLocator), spew.Sdump(testMsg.blockLocator))
}
if !reflect.DeepEqual(testMsg.stopHash, gotStopHash) {
- t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash))
+ t.Errorf("get headers msg test err: amount:got %d\nwant %d", gotStopHash, testMsg.stopHash)
+ }
+
+ if !reflect.DeepEqual(testMsg.skip, gotSkip) {
+ t.Errorf("get headers msg test err: skip:got %d\nwant %d", gotSkip, testMsg.skip)
}
}
}
func TestStatusMessage(t *testing.T) {
- statusResponseMsg := NewStatusMessage(&testBlock.BlockHeader)
- gotHash := statusResponseMsg.GetHash()
- if !reflect.DeepEqual(*gotHash, testBlock.Hash()) {
- t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotHash), spew.Sdump(testBlock.Hash()))
+ statusResponseMsg := NewStatusMessage(&testBlock.BlockHeader, &testBlock.BlockHeader)
+ gotBestHash := statusResponseMsg.GetBestHash()
+ if !reflect.DeepEqual(*gotBestHash, testBlock.Hash()) {
+ t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotBestHash), spew.Sdump(testBlock.Hash()))
+ }
+ gotIrreversibleHash := statusResponseMsg.GetIrreversibleHash()
+ if !reflect.DeepEqual(*gotIrreversibleHash, testBlock.Hash()) {
+ t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotIrreversibleHash), spew.Sdump(testBlock.Hash()))
}
}
type Peer struct {
BasePeer
- mtx sync.RWMutex
- services consensus.ServiceFlag
- height uint64
- hash *bc.Hash
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
- knownSignatures *set.Set // Set of block signatures known to be known by this peer
- knownStatus uint64 // Set of chain status known to be known by this peer
- filterAdds *set.Set // Set of addresses that the spv node cares about.
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ bestHeight uint64
+ bestHash *bc.Hash
+ irreversibleHeight uint64
+ irreversibleHash *bc.Hash
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownSignatures *set.Set // Set of block signatures known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
}
func newPeer(basePeer BasePeer) *Peer {
func (p *Peer) Height() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.height
+
+ return p.bestHeight
+}
+
+func (p *Peer) IrreversibleHeight() uint64 {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+
+ return p.irreversibleHeight
}
func (p *Peer) AddFilterAddress(address []byte) {
return p.TrySend(msgs.BlockchainChannel, msg)
}
-func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
+ msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
return p.TrySend(msgs.BlockchainChannel, msg)
}
return &PeerInfo{
ID: p.ID(),
RemoteAddr: p.Addr().String(),
- Height: p.height,
+ Height: p.bestHeight,
Ping: ping.String(),
Duration: sentStatus.Duration.String(),
TotalSent: sentStatus.Bytes,
return nil
}
-func (p *Peer) SendStatus(header *types.BlockHeader) error {
- msg := msgs.NewStatusMessage(header)
+func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+ msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
return errSendStatusMsg
}
- p.markNewStatus(header.Height)
+ p.markNewStatus(bestHeader.Height)
return nil
}
-func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
- p.height = height
- p.hash = hash
+
+ p.bestHeight = bestHeight
+ p.bestHash = bestHash
+}
+
+func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.irreversibleHeight = irreversibleHeight
+ p.irreversibleHash = irreversibleHash
}
type PeerSet struct {
if !p.services.IsEnable(flag) {
continue
}
- if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+ if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
+ bestPeer = p
+ }
+ }
+ return bestPeer
+}
+
+func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var bestPeer *Peer
+ for _, p := range ps.peers {
+ if !p.services.IsEnable(flag) {
+ continue
+ }
+ if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
bestPeer = p
}
}
return nil
}
-func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
- msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
- peers := ps.peersWithoutNewStatus(bestBlock.Height)
+func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+ msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
+ peers := ps.peersWithoutNewStatus(bestHeader.Height)
for _, peer := range peers {
if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
ps.RemovePeer(peer.ID())
continue
}
- peer.markNewStatus(bestBlock.Height)
+ peer.markNewStatus(bestHeader.Height)
}
return nil
}
return
}
- peer.SetStatus(height, hash)
+ peer.SetBestStatus(height, hash)
}
log "github.com/sirupsen/logrus"
+ "github.com/vapor/common"
"github.com/vapor/config"
"github.com/vapor/event"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
"github.com/vapor/protocol/state"
- "github.com/vapor/common"
)
const maxProcessBlockChSize = 1024
return &c.bestNode.Hash
}
-// BestBlockHeader returns the chain tail block
+// BestIrreversibleHeader returns the chain best irreversible block
+func (c *Chain) BestIrreversibleHeader() *types.BlockHeader {
+ return c.bestIrreversibleNode.BlockHeader()
+}
+
func (c *Chain) BestBlockHeader() *types.BlockHeader {
node := c.index.BestNode()
return node.BlockHeader()
return c.bestBlockHeader.Height
}
+func (c *Chain) BestIrreversibleHeader() *types.BlockHeader {
+ return c.bestBlockHeader
+}
+
func (c *Chain) CalcNextSeed(hash *bc.Hash) (*bc.Hash, error) {
return &bc.Hash{V0: hash.V1, V1: hash.V2, V2: hash.V3, V3: hash.V0}, nil
}