"github.com/vapor/event"
"github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
+ "github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
blockFetcher *blockFetcher
chain Chain
- quit chan struct{}
+ eventDispatcher *event.Dispatcher
+ blockProposeMsgSub *event.Subscription
+ BlockSignatureMsgSub *event.Subscription
- eventDispatcher *event.Dispatcher
- proposedBlockSub *event.Subscription
- sendBlockSignSub *event.Subscription
+ quit chan struct{}
}
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
AddBannedPeer(string) error
+ ID() [32]byte
}
// Chain is the interface for Bytom core
type Chain interface {
BestBlockHeight() uint64
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
ProcessBlock(*types.Block) (bool, error)
}
sw: sw,
peers: peers,
blockFetcher: newBlockFetcher(chain, peers),
- quit: make(chan struct{}),
eventDispatcher: dispatcher,
+ quit: make(chan struct{}),
}
protocolReactor := NewConsensusReactor(manager)
manager.sw.AddReactor("CONSENSUS", protocolReactor)
log.WithFields(log.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
switch msg := msg.(type) {
- case *BlockProposeMessage:
+ case *BlockProposeMsg:
m.handleBlockProposeMsg(peerID, msg)
- case *BlockSignMessage:
- m.handleBlockSigMsg(peerID, msg)
+ case *BlockSignatureMsg:
+ m.handleBlockSignatureMsg(peerID, msg)
default:
log.WithFields(log.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
}
}
-func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) {
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
block, err := msg.GetProposeBlock()
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
return
}
m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
}
-func (m *Manager) handleBlockSigMsg(peerID string, msg *BlockSignMessage) {
- if err := m.eventDispatcher.Post(event.BlockSignEvent{PeerID: []byte(peerID), BlockID: msg.BlockID, Height: msg.Height, Sign: msg.Sign, Pubkey: msg.Pubkey}); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed post block sign event")
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+ var id [32]byte
+ copy(id[:], peerID)
+ if err := m.eventDispatcher.Post(event.ReceivedBlockSignatureEvent{BlockID: msg.BlockID, Height: msg.Height, Signature: msg.Signature, PeerID: id}); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on post block signature event")
}
}
-func (m *Manager) proposedBlockBroadcastLoop() {
+func (m *Manager) blockProposeMsgBroadcastLoop() {
for {
select {
- case obj, ok := <-m.proposedBlockSub.Chan():
+ case obj, ok := <-m.blockProposeMsgSub.Chan():
if !ok {
- log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
+ log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
return
}
- ev, ok := obj.Data.(event.NewProposedBlockEvent)
+ ev, ok := obj.Data.(event.NewBlockProposeEvent)
if !ok {
log.WithFields(log.Fields{"module": logModule}).Error("event type error")
continue
}
- proposedMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
+ proposeMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on create new propose block msg")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeBroadcastMsg")
return
}
- if err := m.peers.BroadcastMsg(proposedMsg); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+ if err := m.peers.BroadcastMsg(proposeMsg); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
continue
}
}
}
-func (m *Manager) blockSignBroadcastLoop() {
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
for {
select {
- case obj, ok := <-m.sendBlockSignSub.Chan():
+ case obj, ok := <-m.blockProposeMsgSub.Chan():
if !ok {
- log.WithFields(log.Fields{"module": logModule}).Warning("send block sign subscription channel closed")
+ log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
return
}
- ev, ok := obj.Data.(event.SendBlockSignEvent)
+ ev, ok := obj.Data.(event.BlockSignatureEvent)
if !ok {
log.WithFields(log.Fields{"module": logModule}).Error("event type error")
continue
}
-
- blockSignMsg := NewBlockSignBroadcastMsg(ev.BlockID, ev.Height, ev.Sign, ev.Pubkey, ConsensusChannel)
+ blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+ return
+ }
+ blockSignMsg := NewSignatureBroadcastMsg(ev.BlockHash.Byte32(), blockHeader.Height, ev.Signature, m.sw.ID(), ConsensusChannel)
if err := m.peers.BroadcastMsg(blockSignMsg); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed to broadcast block sign message.")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
return
}
func (m *Manager) Start() error {
var err error
- m.proposedBlockSub, err = m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
+ m.blockProposeMsgSub, err = m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
if err != nil {
return err
}
- m.sendBlockSignSub, err = m.eventDispatcher.Subscribe(event.SendBlockSignEvent{})
+ m.BlockSignatureMsgSub, err = m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
if err != nil {
return err
}
- go m.proposedBlockBroadcastLoop()
- go m.blockSignBroadcastLoop()
+ go m.blockProposeMsgBroadcastLoop()
+ go m.blockSignatureMsgBroadcastLoop()
return nil
}
-//Stop stop sync manager
+//Stop consensus manager
func (m *Manager) Stop() {
close(m.quit)
- m.proposedBlockSub.Unsubscribe()
- m.sendBlockSignSub.Unsubscribe()
+ m.blockProposeMsgSub.Unsubscribe()
+ m.BlockSignatureMsgSub.Unsubscribe()
}