type NewProposedBlockEvent struct{ Block types.Block }
-type BlockSignatureEvent struct {
+type BlockSignatureEvent struct {
BlockHash bc.Hash
- Signature []byte
+ Signature []byte
}
+//NewBlockProposeEvent block propose event which needs to broadcast.
+type NewBlockProposeEvent struct{ Block types.Block }
+
// TypeMuxEvent is a time-tagged notification pushed to subscribers.
type TypeMuxEvent struct {
Time time.Time
b.blockKeeper.syncPeer = b.peers.GetPeer("test node A")
cases := []struct {
syncTimeout time.Duration
- testNode *ChainManager
+ testNode *Manager
requireHeight uint64
want *types.Block
err error
Peers() *p2p.PeerSet
}
-//ChainManager is responsible for the business layer information synchronization
-type ChainManager struct {
+//Manager is responsible for the business layer information synchronization
+type Manager struct {
sw Switch
chain Chain
txPool *core.TxPool
}
//NewChainManager create a chain sync manager.
-func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
- manager := &ChainManager{
+func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+ manager := &Manager{
sw: sw,
txPool: txPool,
chain: chain,
return manager, nil
}
-func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
- cm.peers.AddPeer(peer)
+func (m *Manager) AddPeer(peer peers.BasePeer) {
+ m.peers.AddPeer(peer)
}
//IsCaughtUp check wheather the peer finish the sync
-func (cm *ChainManager) IsCaughtUp() bool {
- peer := cm.peers.BestPeer(consensus.SFFullNode)
- return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
+func (m *Manager) IsCaughtUp() bool {
+ peer := m.peers.BestPeer(consensus.SFFullNode)
+ return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
}
-func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
+func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
block, err := msg.GetBlock()
if err != nil {
return
}
- cm.blockKeeper.processBlock(peer.ID(), block)
+ m.blockKeeper.processBlock(peer.ID(), block)
}
-func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
+func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
blocks, err := msg.GetBlocks()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
return
}
- cm.blockKeeper.processBlocks(peer.ID(), blocks)
+ m.blockKeeper.processBlocks(peer.ID(), blocks)
}
-func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
+func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
peer.AddFilterAddress(msg.Address)
}
-func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
+func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
peer.FilterClear()
}
-func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
+func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
peer.AddFilterAddresses(msg.Addresses)
}
-func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
+func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
var block *types.Block
var err error
if msg.Height != 0 {
- block, err = cm.chain.GetBlockByHeight(msg.Height)
+ block, err = m.chain.GetBlockByHeight(msg.Height)
} else {
- block, err = cm.chain.GetBlockByHash(msg.GetHash())
+ block, err = m.chain.GetBlockByHash(msg.GetHash())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
ok, err := peer.SendBlock(block)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
- blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
+ blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(blocks) == 0 {
return
}
ok, err := peer.SendBlocks(sendBlocks)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
- headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
+ headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(headers) == 0 {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
return
ok, err := peer.SendHeaders(headers)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
+func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
var err error
var block *types.Block
if msg.Height != 0 {
- block, err = cm.chain.GetBlockByHeight(msg.Height)
+ block, err = m.chain.GetBlockByHeight(msg.Height)
} else {
- block, err = cm.chain.GetBlockByHash(msg.GetHash())
+ block, err = m.chain.GetBlockByHash(msg.GetHash())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
}
blockHash := block.Hash()
- txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
+ txStatus, err := m.chain.GetTransactionStatus(&blockHash)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
return
}
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
}
-func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
+func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
headers, err := msg.GetHeaders()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
return
}
- cm.blockKeeper.processHeaders(peer.ID(), headers)
+ m.blockKeeper.processHeaders(peer.ID(), headers)
}
-func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
- if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
+func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
+ if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
peer.SetStatus(msg.Height, msg.GetHash())
return
}
}
-func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
+func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
tx, err := msg.GetTransaction()
if err != nil {
- cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
return
}
- if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
- cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
+ m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
}
- cm.peers.MarkTx(peer.ID(), tx.ID)
+ m.peers.MarkTx(peer.ID(), tx.ID)
}
-func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
+func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
txs, err := msg.GetTransactions()
if err != nil {
- cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+ m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
return
}
if len(txs) > msgs.TxsMsgMaxTxNum {
- cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+ m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
return
}
for _, tx := range txs {
- if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
- cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
+ m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
return
}
- cm.peers.MarkTx(peer.ID(), tx.ID)
+ m.peers.MarkTx(peer.ID(), tx.ID)
}
}
-func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
- peer := cm.peers.GetPeer(basePeer.ID())
+func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
+ peer := m.peers.GetPeer(basePeer.ID())
if peer == nil {
return
}
switch msg := msg.(type) {
case *msgs.GetBlockMessage:
- cm.handleGetBlockMsg(peer, msg)
+ m.handleGetBlockMsg(peer, msg)
case *msgs.BlockMessage:
- cm.handleBlockMsg(peer, msg)
+ m.handleBlockMsg(peer, msg)
case *msgs.StatusMessage:
- cm.handleStatusMsg(basePeer, msg)
+ m.handleStatusMsg(basePeer, msg)
case *msgs.TransactionMessage:
- cm.handleTransactionMsg(peer, msg)
+ m.handleTransactionMsg(peer, msg)
case *msgs.TransactionsMessage:
- cm.handleTransactionsMsg(peer, msg)
+ m.handleTransactionsMsg(peer, msg)
case *msgs.GetHeadersMessage:
- cm.handleGetHeadersMsg(peer, msg)
+ m.handleGetHeadersMsg(peer, msg)
case *msgs.HeadersMessage:
- cm.handleHeadersMsg(peer, msg)
+ m.handleHeadersMsg(peer, msg)
case *msgs.GetBlocksMessage:
- cm.handleGetBlocksMsg(peer, msg)
+ m.handleGetBlocksMsg(peer, msg)
case *msgs.BlocksMessage:
- cm.handleBlocksMsg(peer, msg)
+ m.handleBlocksMsg(peer, msg)
case *msgs.FilterLoadMessage:
- cm.handleFilterLoadMsg(peer, msg)
+ m.handleFilterLoadMsg(peer, msg)
case *msgs.FilterAddMessage:
- cm.handleFilterAddMsg(peer, msg)
+ m.handleFilterAddMsg(peer, msg)
case *msgs.FilterClearMessage:
- cm.handleFilterClearMsg(peer)
+ m.handleFilterClearMsg(peer)
case *msgs.GetMerkleBlockMessage:
- cm.handleGetMerkleBlockMsg(peer, msg)
+ m.handleGetMerkleBlockMsg(peer, msg)
default:
log.WithFields(log.Fields{
}
}
-func (cm *ChainManager) RemovePeer(peerID string) {
- cm.peers.RemovePeer(peerID)
+func (m *Manager) RemovePeer(peerID string) {
+ m.peers.RemovePeer(peerID)
}
-func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
- p := cm.peers.GetPeer(peer.ID())
+func (m *Manager) SendStatus(peer peers.BasePeer) error {
+ p := m.peers.GetPeer(peer.ID())
if p == nil {
return errors.New("invalid peer")
}
- if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
- cm.peers.RemovePeer(p.ID())
+ if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+ m.peers.RemovePeer(p.ID())
return err
}
return nil
}
-func (cm *ChainManager) Start() error {
+func (m *Manager) Start() error {
var err error
- cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+ m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
if err != nil {
return err
}
// broadcast transactions
- go cm.txBroadcastLoop()
- go cm.txSyncLoop()
+ go m.txBroadcastLoop()
+ go m.txSyncLoop()
return nil
}
//Stop stop sync manager
-func (cm *ChainManager) Stop() {
- close(cm.quitSync)
+func (m *Manager) Stop() {
+ close(m.quitSync)
}
type ProtocolReactor struct {
p2p.BaseReactor
- cm *ChainManager
+ manager *Manager
}
// NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(cm *ChainManager) *ProtocolReactor {
+func NewProtocolReactor(manager *Manager) *ProtocolReactor {
pr := &ProtocolReactor{
- cm: cm,
+ manager: manager,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
return pr
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
- pr.cm.AddPeer(peer)
- if err := pr.cm.SendStatus(peer); err != nil {
+ pr.manager.AddPeer(peer)
+ if err := pr.manager.SendStatus(peer); err != nil {
return err
}
- pr.cm.syncTransactions(peer.Key)
+ pr.manager.syncTransactions(peer.Key)
return nil
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- pr.cm.RemovePeer(peer.Key)
+ pr.manager.RemovePeer(peer.Key)
}
//decodeMessage decode msg
return
}
- pr.cm.processMsg(src, msgType, msg)
+ pr.manager.processMsg(src, msgType, msg)
}
flag consensus.ServiceFlag
srcPeer *P2PPeer
- remoteNode *ChainManager
+ remoteNode *Manager
msgCh chan []byte
async bool
}
return p.flag
}
-func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *ChainManager) {
+func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *Manager) {
p.srcPeer = srcPeer
p.remoteNode = node
}
func (ps *PeerSet) StopPeerGracefully(string) {}
type NetWork struct {
- nodes map[*ChainManager]P2PPeer
+ nodes map[*Manager]P2PPeer
}
func NewNetWork() *NetWork {
- return &NetWork{map[*ChainManager]P2PPeer{}}
+ return &NetWork{map[*Manager]P2PPeer{}}
}
-func (nw *NetWork) Register(node *ChainManager, addr, id string, flag consensus.ServiceFlag) {
+func (nw *NetWork) Register(node *Manager, addr, id string, flag consensus.ServiceFlag) {
peer := NewP2PPeer(addr, id, flag)
nw.nodes[node] = *peer
}
-func (nw *NetWork) HandsShake(nodeA, nodeB *ChainManager) (*P2PPeer, *P2PPeer, error) {
+func (nw *NetWork) HandsShake(nodeA, nodeB *Manager) (*P2PPeer, *P2PPeer, error) {
B2A, ok := nw.nodes[nodeA]
if !ok {
return nil, nil, errors.New("can't find nodeA's p2p peer on network")
return blocks
}
-func mockSync(blocks []*types.Block) *ChainManager {
+func mockSync(blocks []*types.Block) *Manager {
chain := mock.NewChain()
peers := peers.NewPeerSet(NewPeerSet())
chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
chain.SetBlockByHeight(block.Height, block)
}
- return &ChainManager{
+ return &Manager{
chain: chain,
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
txs []*types.Tx
}
-func (cm *ChainManager) syncTransactions(peerID string) {
- pending := cm.txPool.GetTransactions()
+func (m *Manager) syncTransactions(peerID string) {
+ pending := m.txPool.GetTransactions()
if len(pending) == 0 {
return
}
for i, batch := range pending {
txs[i] = batch.Tx
}
- cm.txSyncCh <- &txSyncMsg{peerID, txs}
+ m.txSyncCh <- &txSyncMsg{peerID, txs}
}
-func (cm *ChainManager) txBroadcastLoop() {
+func (m *Manager) txBroadcastLoop() {
for {
select {
- case obj, ok := <-cm.txMsgSub.Chan():
+ case obj, ok := <-m.txMsgSub.Chan():
if !ok {
log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
return
}
if ev.TxMsg.MsgType == core.MsgNewTx {
- if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+ if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
continue
}
}
- case <-cm.quitSync:
+ case <-m.quitSync:
return
}
}
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
-func (cm *ChainManager) txSyncLoop() {
+func (m *Manager) txSyncLoop() {
pending := make(map[string]*txSyncMsg)
sending := false // whether a send is active
done := make(chan error, 1) // result of the send
// send starts a sending a pack of transactions from the sync.
send := func(msg *txSyncMsg) {
- peer := cm.peers.GetPeer(msg.peerID)
+ peer := m.peers.GetPeer(msg.peerID)
if peer == nil {
delete(pending, msg.peerID)
return
go func() {
err := peer.SendTransactions(sendTxs)
if err != nil {
- cm.peers.RemovePeer(msg.peerID)
+ m.peers.RemovePeer(msg.peerID)
}
done <- err
}()
for {
select {
- case msg := <-cm.txSyncCh:
+ case msg := <-m.txSyncCh:
pending[msg.peerID] = msg
if !sending {
send(msg)
--- /dev/null
+package consensusmgr
+
+import (
+ "github.com/sirupsen/logrus"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/protocol/bc"
+)
+
+const (
+ maxBlockDistance = 64
+ maxMsgSetSize = 128
+ newBlockChSize = 64
+)
+
+// blockFetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type blockFetcher struct {
+ chain Chain
+ peers *peers.PeerSet
+
+ newBlockCh chan *blockMsg
+ queue *prque.Prque
+ msgSet map[bc.Hash]*blockMsg
+}
+
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
+func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
+ f := &blockFetcher{
+ chain: chain,
+ peers: peers,
+ newBlockCh: make(chan *blockMsg, newBlockChSize),
+ queue: prque.New(),
+ msgSet: make(map[bc.Hash]*blockMsg),
+ }
+ go f.blockProcessor()
+ return f
+}
+
+func (f *blockFetcher) blockProcessor() {
+ for {
+ height := f.chain.BestBlockHeight()
+ for !f.queue.Empty() {
+ msg := f.queue.PopItem().(*blockMsg)
+ if msg.block.Height > height+1 {
+ f.queue.Push(msg, -float32(msg.block.Height))
+ break
+ }
+
+ f.insert(msg)
+ delete(f.msgSet, msg.block.Hash())
+ }
+ f.add(<-f.newBlockCh)
+ }
+}
+
+func (f *blockFetcher) add(msg *blockMsg) {
+ bestHeight := f.chain.BestBlockHeight()
+ if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+ return
+ }
+
+ blockHash := msg.block.Hash()
+ if _, ok := f.msgSet[blockHash]; !ok {
+ f.msgSet[blockHash] = msg
+ f.queue.Push(msg, -float32(msg.block.Height))
+ logrus.WithFields(logrus.Fields{
+ "module": logModule,
+ "block height": msg.block.Height,
+ "block hash": blockHash.String(),
+ }).Debug("blockFetcher receive propose block")
+ }
+}
+
+func (f *blockFetcher) insert(msg *blockMsg) {
+ isOrphan, err := f.chain.ProcessBlock(msg.block)
+ if err != nil {
+ peer := f.peers.GetPeer(msg.peerID)
+ if peer == nil {
+ return
+ }
+
+ f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
+ return
+ }
+
+ if isOrphan {
+ return
+ }
+
+ hash := msg.block.Hash()
+ f.peers.SetStatus(msg.peerID, msg.block.Height, &hash)
+ proposeMsg, err := NewBlockProposeMsg(msg.block)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+ return
+ }
+
+ if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+ return
+ }
+}
+
+func (f *blockFetcher) processNewBlock(msg *blockMsg) {
+ f.newBlockCh <- msg
+}
--- /dev/null
+package consensusmgr
+
+import (
+ "github.com/vapor/netsync/peers"
+)
+
+// BroadcastMsg the message that can be broadcast
+// by peer set.
+type BroadcastMsg struct {
+ msg ConsensusMessage
+ transChan byte
+}
+
+// NewBroadcastMsg create concrete broadcast message, implement peers.BroadcastMsg interface.
+func NewBroadcastMsg(msg ConsensusMessage, transChan byte) *BroadcastMsg {
+ return &BroadcastMsg{
+ msg: msg,
+ transChan: transChan,
+ }
+}
+
+// GetChan get message transfer channel.
+func (b *BroadcastMsg) GetChan() byte {
+ return b.transChan
+}
+
+// GetMsg get ConsensusMessage.
+func (b *BroadcastMsg) GetMsg() interface{} {
+ return struct{ ConsensusMessage }{b.msg}
+}
+
+// MsgString get a string representation of the message.
+func (b *BroadcastMsg) MsgString() string {
+ return b.msg.String()
+}
+
+// MarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (b *BroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+ b.msg.BroadcastMarkSendRecord(ps, peers)
+}
+
+// FilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (b *BroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+ return b.msg.BroadcastFilterTargetPeers(ps)
+}
--- /dev/null
+package consensusmgr
+
+import (
+ "bytes"
+ "encoding/hex"
+ "errors"
+ "fmt"
+
+ "github.com/tendermint/go-wire"
+
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+const (
+ blockSignatureByte = byte(0x10)
+ blockProposeByte = byte(0x11)
+)
+
+//ConsensusMessage is a generic message for consensus reactor.
+type ConsensusMessage interface {
+ String() string
+ BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string)
+ BroadcastFilterTargetPeers(ps *peers.PeerSet) []string
+}
+
+var _ = wire.RegisterInterface(
+ struct{ ConsensusMessage }{},
+ wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte},
+ wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte},
+)
+
+//decodeMessage decode msg
+func decodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
+ msgType = bz[0]
+ n := int(0)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ ConsensusMessage }).ConsensusMessage
+ if err != nil && n != len(bz) {
+ err = errors.New("DecodeMessage() had bytes left over")
+ }
+ return
+}
+
+// BlockSignatureMsg block signature message transferred between nodes.
+type BlockSignatureMsg struct {
+ BlockHash [32]byte
+ Height uint64
+ Signature []byte
+ PubKey [32]byte
+}
+
+//NewBlockSignatureMsg create new block signature msg.
+func NewBlockSignatureMsg(blockHash bc.Hash, height uint64, signature []byte, pubKey [32]byte) ConsensusMessage {
+ hash := blockHash.Byte32()
+ return &BlockSignatureMsg{BlockHash: hash, Height: height, Signature: signature, PubKey: pubKey}
+}
+
+func (bs *BlockSignatureMsg) String() string {
+ return fmt.Sprintf("{block_hash: %s,block_height:%d,signature:%s,pubkey:%s}", hex.EncodeToString(bs.BlockHash[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PubKey[:]))
+}
+
+// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (bs *BlockSignatureMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) {
+ for _, peer := range peers {
+ ps.MarkBlockSignature(peer, bs.Signature)
+ }
+}
+
+// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
+ return ps.PeersWithoutSign(bs.Signature)
+}
+
+// BlockProposeMsg block propose message transferred between nodes.
+type BlockProposeMsg struct {
+ RawBlock []byte
+}
+
+//NewBlockProposeMsg create new block propose msg.
+func NewBlockProposeMsg(block *types.Block) (ConsensusMessage, error) {
+ rawBlock, err := block.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+ return &BlockProposeMsg{RawBlock: rawBlock}, nil
+}
+
+//GetProposeBlock get propose block from msg.
+func (bp *BlockProposeMsg) GetProposeBlock() (*types.Block, error) {
+ block := &types.Block{}
+ if err := block.UnmarshalText(bp.RawBlock); err != nil {
+ return nil, err
+ }
+ return block, nil
+}
+
+func (bp *BlockProposeMsg) String() string {
+ block, err := bp.GetProposeBlock()
+ if err != nil {
+ return "{err: wrong message}"
+ }
+ blockHash := block.Hash()
+ return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String())
+}
+
+// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (bp *BlockProposeMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) {
+ block, err := bp.GetProposeBlock()
+ if err != nil {
+ return
+ }
+
+ hash := block.Hash()
+ height := block.Height
+ for _, peer := range peers {
+ ps.MarkBlock(peer, &hash)
+ ps.MarkStatus(peer, height)
+ }
+}
+
+// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (bp *BlockProposeMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
+ block, err := bp.GetProposeBlock()
+ if err != nil {
+ return nil
+ }
+
+ return ps.PeersWithoutBlock(block.Hash())
+}
--- /dev/null
+package consensusmgr
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/davecgh/go-spew/spew"
+ "github.com/tendermint/go-wire"
+
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+var _ = wire.RegisterInterface(
+ struct{ ConsensusMessage }{},
+ wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte},
+ wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte},
+)
+
+func TestDecodeMessage(t *testing.T) {
+ testCases := []struct {
+ msg ConsensusMessage
+ msgType byte
+ }{
+ {
+ msg: &BlockSignatureMsg{
+ BlockHash: [32]byte{0x01},
+ Signature: []byte{0x00},
+ PubKey: [32]byte{0x01},
+ },
+ msgType: blockSignatureByte,
+ },
+ {
+ msg: &BlockProposeMsg{
+ RawBlock: []byte{0x01, 0x02},
+ },
+ msgType: blockProposeByte,
+ },
+ }
+ for i, c := range testCases {
+ binMsg := wire.BinaryBytes(struct{ ConsensusMessage }{c.msg})
+ gotMsgType, gotMsg, err := decodeMessage(binMsg)
+ if err != nil {
+ t.Fatalf("index:%d decode Message err %s", i, err)
+ }
+ if gotMsgType != c.msgType {
+ t.Fatalf("index:%d decode Message type err. got:%d want:%d", i, gotMsgType, c.msg)
+ }
+ if !reflect.DeepEqual(gotMsg, c.msg) {
+ t.Fatalf("index:%d decode Message err. got:%s\n want:%s", i, spew.Sdump(gotMsg), spew.Sdump(c.msg))
+ }
+ }
+}
+
+func TestBlockSignBroadcastMsg(t *testing.T) {
+ blockSignMsg := &BlockSignatureMsg{
+ BlockHash: [32]byte{0x01},
+ Height: 100,
+ Signature: []byte{0x00},
+ PubKey: [32]byte{0x01},
+ }
+ signatureBroadcastMsg := NewBroadcastMsg(NewBlockSignatureMsg(bc.NewHash(blockSignMsg.BlockHash), blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PubKey), consensusChannel)
+
+ binMsg := wire.BinaryBytes(signatureBroadcastMsg.GetMsg())
+ gotMsgType, gotMsg, err := decodeMessage(binMsg)
+ if err != nil {
+ t.Fatalf("decode Message err %s", err)
+ }
+ if gotMsgType != blockSignatureByte {
+ t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockSignatureByte)
+ }
+ if !reflect.DeepEqual(gotMsg, blockSignMsg) {
+ t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg))
+ }
+}
+
+func TestBlockProposeBroadcastMsg(t *testing.T) {
+ blockProposeMsg, _ := NewBlockProposeMsg(testBlock)
+
+ proposeBroadcastMsg := NewBroadcastMsg(blockProposeMsg, consensusChannel)
+
+ binMsg := wire.BinaryBytes(proposeBroadcastMsg.GetMsg())
+ gotMsgType, gotMsg, err := decodeMessage(binMsg)
+ if err != nil {
+ t.Fatalf("decode Message err %s", err)
+ }
+ if gotMsgType != blockProposeByte {
+ t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockProposeByte)
+ }
+ if !reflect.DeepEqual(gotMsg, blockProposeMsg) {
+ t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockProposeMsg))
+ }
+}
+
+var testBlock = &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 1,
+ Height: 0,
+ Timestamp: 1528945000,
+ BlockCommitment: types.BlockCommitment{
+ TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
+ TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
+ },
+ },
+}
+
+func TestBlockProposeMsg(t *testing.T) {
+ blockMsg, err := NewBlockProposeMsg(testBlock)
+ if err != nil {
+ t.Fatalf("create new mine block msg err:%s", err)
+ }
+
+ gotBlock, err := blockMsg.(*BlockProposeMsg).GetProposeBlock()
+ if err != nil {
+ t.Fatalf("got block err:%s", err)
+ }
+
+ if !reflect.DeepEqual(gotBlock.BlockHeader, testBlock.BlockHeader) {
+ t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
+ }
+
+ wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
+
+ blockMsg.(*BlockProposeMsg).RawBlock[1] = blockMsg.(*BlockProposeMsg).RawBlock[1] + 0x1
+ _, err = blockMsg.(*BlockProposeMsg).GetProposeBlock()
+ if err == nil {
+ t.Fatalf("get mine block err")
+ }
+
+ wantString = "{err: wrong message}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
+}
+
+func TestBlockSignatureMsg(t *testing.T) {
+ msg := &BlockSignatureMsg{
+ BlockHash: [32]byte{0x01},
+ Height: 100,
+ Signature: []byte{0x00},
+ PubKey: [32]byte{0x01},
+ }
+ gotMsg := NewBlockSignatureMsg(bc.NewHash(msg.BlockHash), msg.Height, msg.Signature, msg.PubKey)
+
+ if !reflect.DeepEqual(gotMsg, msg) {
+ t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
+ }
+ wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height:100,signature:00,pubkey:0100000000000000000000000000000000000000000000000000000000000000}"
+ if gotMsg.String() != wantString {
+ t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
+ }
+}
--- /dev/null
+package consensusmgr
+
+import (
+ "reflect"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/vapor/event"
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+// Switch is the interface for p2p switch.
+type Switch interface {
+ AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+ AddBannedPeer(string) error
+ ID() [32]byte
+}
+
+// Chain is the interface for Bytom core.
+type Chain interface {
+ BestBlockHeight() uint64
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+ ProcessBlock(*types.Block) (bool, error)
+ ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
+}
+
+type blockMsg struct {
+ block *types.Block
+ peerID string
+}
+
+// Manager is the consensus message network synchronization manager.
+type Manager struct {
+ sw Switch
+ chain Chain
+ peers *peers.PeerSet
+ blockFetcher *blockFetcher
+ eventDispatcher *event.Dispatcher
+
+ quit chan struct{}
+}
+
+// NewManager create new manager.
+func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
+ manager := &Manager{
+ sw: sw,
+ peers: peers,
+ blockFetcher: newBlockFetcher(chain, peers),
+ eventDispatcher: dispatcher,
+ quit: make(chan struct{}),
+ }
+ protocolReactor := NewConsensusReactor(manager)
+ manager.sw.AddReactor("CONSENSUS", protocolReactor)
+ return manager
+}
+
+func (m *Manager) addPeer(peer peers.BasePeer) {
+ m.peers.AddPeer(peer)
+}
+
+func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
+ peer := m.peers.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+
+ logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
+
+ switch msg := msg.(type) {
+ case *BlockProposeMsg:
+ m.handleBlockProposeMsg(peerID, msg)
+
+ case *BlockSignatureMsg:
+ m.handleBlockSignatureMsg(peerID, msg)
+
+ default:
+ logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
+ }
+}
+
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
+ block, err := msg.GetProposeBlock()
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
+ return
+ }
+
+ hash := block.Hash()
+ m.peers.MarkBlock(peerID, &hash)
+ m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
+}
+
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+ blockHash := bc.NewHash(msg.BlockHash)
+ if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
+ m.peers.AddBanScore(peerID, 20, 0, err.Error())
+ return
+ }
+}
+
+func (m *Manager) blockProposeMsgBroadcastLoop() {
+ blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
+ return
+ }
+ defer blockProposeMsgSub.Unsubscribe()
+
+ for {
+ select {
+ case obj, ok := <-blockProposeMsgSub.Chan():
+ if !ok {
+ logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
+ return
+ }
+
+ ev, ok := obj.Data.(event.NewBlockProposeEvent)
+ if !ok {
+ logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+ proposeMsg, err := NewBlockProposeMsg(&ev.Block)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+ return
+ }
+
+ if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
+ continue
+ }
+
+ case <-m.quit:
+ return
+ }
+ }
+}
+
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
+ blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
+ return
+ }
+ defer blockSignatureMsgSub.Unsubscribe()
+ for {
+ select {
+ case obj, ok := <-blockSignatureMsgSub.Chan():
+ if !ok {
+ logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
+ return
+ }
+
+ ev, ok := obj.Data.(event.BlockSignatureEvent)
+ if !ok {
+ logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+ return
+ }
+
+ blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
+ if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
+ return
+ }
+
+ case <-m.quit:
+ return
+ }
+ }
+}
+
+func (m *Manager) removePeer(peerID string) {
+ m.peers.RemovePeer(peerID)
+}
+
+//Start consensus manager service.
+func (m *Manager) Start() error {
+ go m.blockProposeMsgBroadcastLoop()
+ go m.blockSignatureMsgBroadcastLoop()
+ return nil
+}
+
+//Stop consensus manager service.
+func (m *Manager) Stop() {
+ close(m.quit)
+}
--- /dev/null
+package consensusmgr
+
+import (
+ "github.com/sirupsen/logrus"
+
+ "github.com/vapor/p2p"
+ "github.com/vapor/p2p/connection"
+)
+
+const (
+ logModule = "consensus"
+ consensusChannel = byte(0x50)
+ maxBlockchainResponseSize = 22020096 + 2
+)
+
+// ConsensusReactor handles new coming consensus message.
+type ConsensusReactor struct {
+ p2p.BaseReactor
+ manager *Manager
+}
+
+// NewConsensusReactor create consensus reactor.
+func NewConsensusReactor(manager *Manager) *ConsensusReactor {
+ cr := &ConsensusReactor{
+ manager: manager,
+ }
+ cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+ return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+ return []*connection.ChannelDescriptor{
+ {
+ ID: consensusChannel,
+ Priority: 10,
+ SendQueueCapacity: 100,
+ },
+ }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+ return cr.BaseReactor.OnStart()
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+ cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+ cr.manager.addPeer(peer)
+ return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+ cr.manager.removePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling messages.
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+ msgType, msg, err := decodeMessage(msgBytes)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+ return
+ }
+
+ cr.manager.processMsg(src.ID(), msgType, msg)
+}
import (
"errors"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
- cfg "github.com/vapor/config"
+ "github.com/vapor/config"
"github.com/vapor/consensus"
"github.com/vapor/event"
"github.com/vapor/netsync/chainmgr"
+ "github.com/vapor/netsync/consensusmgr"
"github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
- core "github.com/vapor/protocol"
+ "github.com/vapor/protocol"
)
const (
errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
)
+// ChainMgr is the interface for p2p chain message sync manager.
type ChainMgr interface {
Start() error
IsCaughtUp() bool
Stop()
}
+// ConsensusMgr is the interface for consensus message sync manager.
+type ConsensusMgr interface {
+ Start() error
+ Stop()
+}
+
+// Switch is the interface for p2p switch.
type Switch interface {
Start() (bool, error)
Stop() bool
//SyncManager Sync Manager is responsible for the business layer information synchronization
type SyncManager struct {
- config *cfg.Config
- sw Switch
- chainMgr ChainMgr
- peers *peers.PeerSet
+ config *config.Config
+ sw Switch
+ chainMgr ChainMgr
+ consensusMgr ConsensusMgr
+ peers *peers.PeerSet
}
// NewSyncManager create sync manager and set switch.
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
sw, err := p2p.NewSwitch(config)
if err != nil {
return nil, err
}
peers := peers.NewPeerSet(sw)
- chainManger, err := chainmgr.NewChainManager(config, sw, chain, txPool, dispatcher, peers)
+ chainManger, err := chainmgr.NewManager(config, sw, chain, txPool, dispatcher, peers)
if err != nil {
return nil, err
}
-
+ consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers)
return &SyncManager{
- config: config,
- sw: sw,
- chainMgr: chainManger,
- peers: peers,
+ config: config,
+ sw: sw,
+ chainMgr: chainManger,
+ consensusMgr: consensusMgr,
+ peers: peers,
}, nil
}
+// Start message sync manager service.
func (sm *SyncManager) Start() error {
if _, err := sm.sw.Start(); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed start switch")
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed start switch")
+ return err
+ }
+
+ if err := sm.chainMgr.Start(); err != nil {
return err
}
- return sm.chainMgr.Start()
+ return sm.consensusMgr.Start()
}
+// Stop message sync manager service.
func (sm *SyncManager) Stop() {
sm.chainMgr.Stop()
+ sm.consensusMgr.Stop()
if !sm.config.VaultMode {
sm.sw.Stop()
}
}
+// IsListening check if the bytomd service port is open?
func (sm *SyncManager) IsListening() bool {
if sm.config.VaultMode {
return false
return sm.chainMgr.IsCaughtUp()
}
+// PeerCount count the number of connected peers.
func (sm *SyncManager) PeerCount() int {
if sm.config.VaultMode {
return 0
return len(sm.sw.Peers().List())
}
+// GetNetwork get the type of network.
func (sm *SyncManager) GetNetwork() string {
return sm.config.ChainID
}
+// BestPeer fine the peer with the highest height from the connected peers.
func (sm *SyncManager) BestPeer() *peers.PeerInfo {
bestPeer := sm.peers.BestPeer(consensus.SFFullNode)
if bestPeer != nil {
return nil
}
+// DialPeerWithAddress dial the peer and establish a connection.
func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
if sm.config.VaultMode {
return errVaultModeDialPeer
return sm.sw.DialPeerWithAddress(addr)
}
-//GetPeerInfos return peer info of all peers
+//GetPeerInfos return peer info of all connected peers.
func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo {
return sm.peers.GetPeerInfos()
}
return false, nil
}
-func (c *Chain) processBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error {
+func (c *Chain) ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error {
isBestIrreversible, err := c.bbft.ProcessBlockSignature(signature, pubkey, blockHeight, blockHash)
if err != nil {
return err