maxNumOfBlocksRegularSync = uint64(128)
)
-type FastSync interface {
- process() error
- setSyncPeer(peer *peers.Peer)
-}
-
+// Fetcher is the interface for fetch struct
type Fetcher interface {
processBlock(peerID string, block *types.Block)
processBlocks(peerID string, blocks []*types.Block)
type blockKeeper struct {
chain Chain
- fastSync FastSync
+ fastSync *fastSync
msgFetcher Fetcher
peers *peers.PeerSet
syncPeer *peers.Peer
}
}
-func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash, isTimeout func() bool) ([]*types.Block, error) {
headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
+ if isTimeout() {
+ break
+ }
}
return blocks, nil
}
want = append(want, blocks[i])
}
- got, err := bk.locateBlocks(locator, &c.stopHash)
+ mockTimeout := func() bool { return false }
+ got, err := bk.locateBlocks(locator, &c.stopHash, mockTimeout)
if err != c.wantErr {
t.Errorf("case %d: got %v want err = %v", i, err, c.wantErr)
}
var errOrphanBlock = errors.New("fast sync inserting orphan block")
-type BlockProcessor interface {
- process(chan struct{}, chan struct{}, uint64, *sync.WaitGroup)
-}
-
type blockProcessor struct {
chain Chain
- storage Storage
+ storage *storage
peers *peers.PeerSet
}
-func newBlockProcessor(chain Chain, storage Storage, peers *peers.PeerSet) *blockProcessor {
+func newBlockProcessor(chain Chain, storage *storage, peers *peers.PeerSet) *blockProcessor {
return &blockProcessor{
chain: chain,
peers: peers,
type fastSync struct {
chain Chain
msgFetcher MsgFetcher
- blockProcessor BlockProcessor
+ blockProcessor *blockProcessor
peers *peers.PeerSet
mainSyncPeer *peers.Peer
}
-func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync {
+func newFastSync(chain Chain, msgFetcher MsgFetcher, storage *storage, peers *peers.PeerSet) *fastSync {
return &fastSync{
chain: chain,
msgFetcher: msgFetcher,
import (
"errors"
"reflect"
+ "time"
log "github.com/sirupsen/logrus"
ValidateTx(*types.Tx) (bool, error)
}
+// Switch is the interface for network layer
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
Start() (bool, error)
return manager, nil
}
+// AddPeer add the network layer peer to logic layer
func (m *Manager) AddPeer(peer peers.BasePeer) {
m.peers.AddPeer(peer)
}
}
func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
- blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+ endTime := time.Now().Add(requireBlocksTimeout / 2)
+ isTimeout := func() bool {
+ return time.Now().After(endTime)
+ }
+
+ blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
if err != nil || len(blocks) == 0 {
return
}
}
}
+// RemovePeer delete peer for peer set
func (m *Manager) RemovePeer(peerID string) {
m.peers.RemovePeer(peerID)
}
+// SendStatus sent the current self status to remote peer
func (m *Manager) SendStatus(peer peers.BasePeer) error {
p := m.peers.GetPeer(peer.ID())
if p == nil {
return nil
}
+// Start the network logic layer
func (m *Manager) Start() error {
var err error
m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
var (
requireBlockTimeout = 20 * time.Second
requireHeadersTimeout = 30 * time.Second
- requireBlocksTimeout = 50 * time.Second
+ requireBlocksTimeout = 90 * time.Second
checkSyncPeerNumInterval = 5 * time.Second
errRequestBlocksTimeout = errors.New("request blocks timeout")
errSendMsg = errors.New("send message error")
)
+// MsgFetcher is the interface for msg fetch struct
type MsgFetcher interface {
resetParameter()
addSyncPeer(peerID string)
}
type msgFetcher struct {
- storage Storage
+ storage *storage
syncPeers *fastSyncPeers
peers *peers.PeerSet
blockProcessCh chan *blockMsg
mux sync.RWMutex
}
-func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
+func newMsgFetcher(storage *storage, peers *peers.PeerSet) *msgFetcher {
return &msgFetcher{
storage: storage,
syncPeers: newFastSyncPeers(),
errDBFindBlock = errors.New("can't find block from DB")
)
-type Storage interface {
- resetParameter()
- writeBlocks(peerID string, blocks []*types.Block) error
- readBlock(height uint64) (*blockStorage, error)
- deleteBlock(height uint64)
-}
-
+// LocalStore is the interface for persistent storage
type LocalStore interface {
writeBlock(block *types.Block) error
readBlock(height uint64) (*types.Block, error)
node, err := c.getConsensusNode(&blockHeader.PreviousBlockHash, xpub.String())
blockHash := blockHeader.Hash()
if err == errNotFoundConsensusNode {
- log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Warn("can't find consensus node of current node")
+ log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Debug("can't find consensus node of current node")
return nil, nil
} else if err != nil {
return nil, err