OSDN Git Service

modify parameter name
[bytom/vapor.git] / netsync / consensus / handle.go
index 2df8958..9bcce9a 100644 (file)
@@ -8,6 +8,7 @@ import (
        "github.com/vapor/event"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
+       "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
@@ -17,21 +18,23 @@ type Manager struct {
        blockFetcher *blockFetcher
        chain        Chain
 
-       quit chan struct{}
+       eventDispatcher      *event.Dispatcher
+       blockProposeMsgSub   *event.Subscription
+       BlockSignatureMsgSub *event.Subscription
 
-       eventDispatcher  *event.Dispatcher
-       proposedBlockSub *event.Subscription
-       sendBlockSignSub *event.Subscription
+       quit chan struct{}
 }
 
 type Switch interface {
        AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
        AddBannedPeer(string) error
+       ID() [32]byte
 }
 
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeight() uint64
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
        ProcessBlock(*types.Block) (bool, error)
 }
 
@@ -45,8 +48,8 @@ func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *pee
                sw:              sw,
                peers:           peers,
                blockFetcher:    newBlockFetcher(chain, peers),
-               quit:            make(chan struct{}),
                eventDispatcher: dispatcher,
+               quit:            make(chan struct{}),
        }
        protocolReactor := NewConsensusReactor(manager)
        manager.sw.AddReactor("CONSENSUS", protocolReactor)
@@ -66,21 +69,21 @@ func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage)
        log.WithFields(log.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
 
        switch msg := msg.(type) {
-       case *BlockProposeMessage:
+       case *BlockProposeMsg:
                m.handleBlockProposeMsg(peerID, msg)
 
-       case *BlockSignMessage:
-               m.handleBlockSigMsg(peerID, msg)
+       case *BlockSignatureMsg:
+               m.handleBlockSignatureMsg(peerID, msg)
 
        default:
                log.WithFields(log.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
        }
 }
 
-func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) {
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
        block, err := msg.GetProposeBlock()
        if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
                return
        }
 
@@ -89,34 +92,36 @@ func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage)
        m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
 }
 
-func (m *Manager) handleBlockSigMsg(peerID string, msg *BlockSignMessage) {
-       if err := m.eventDispatcher.Post(event.BlockSignEvent{PeerID: []byte(peerID), BlockID: msg.BlockID, Height: msg.Height, Sign: msg.Sign, Pubkey: msg.Pubkey}); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed post block sign event")
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+       var id [32]byte
+       copy(id[:], peerID)
+       if err := m.eventDispatcher.Post(event.ReceivedBlockSignatureEvent{BlockID: msg.BlockID, Height: msg.Height, Signature: msg.Signature, PeerID: id}); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on post block signature event")
        }
 }
 
-func (m *Manager) proposedBlockBroadcastLoop() {
+func (m *Manager) blockProposeMsgBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-m.proposedBlockSub.Chan():
+               case obj, ok := <-m.blockProposeMsgSub.Chan():
                        if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
+                               log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
                                return
                        }
 
-                       ev, ok := obj.Data.(event.NewProposedBlockEvent)
+                       ev, ok := obj.Data.(event.NewBlockProposeEvent)
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Error("event type error")
                                continue
                        }
 
-                       proposedMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
+                       proposeMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
                        if err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on create new propose block msg")
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeBroadcastMsg")
                                return
                        }
-                       if err := m.peers.BroadcastMsg(proposedMsg); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+                       if err := m.peers.BroadcastMsg(proposeMsg); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
                                continue
                        }
 
@@ -126,24 +131,28 @@ func (m *Manager) proposedBlockBroadcastLoop() {
        }
 }
 
-func (m *Manager) blockSignBroadcastLoop() {
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-m.sendBlockSignSub.Chan():
+               case obj, ok := <-m.blockProposeMsgSub.Chan():
                        if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Warning("send block sign subscription channel closed")
+                               log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
                                return
                        }
 
-                       ev, ok := obj.Data.(event.SendBlockSignEvent)
+                       ev, ok := obj.Data.(event.BlockSignatureEvent)
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Error("event type error")
                                continue
                        }
-
-                       blockSignMsg := NewBlockSignBroadcastMsg(ev.BlockID, ev.Height, ev.Sign, ev.Pubkey, ConsensusChannel)
+                       blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+                               return
+                       }
+                       blockSignMsg := NewSignatureBroadcastMsg(ev.BlockHash.Byte32(), blockHeader.Height, ev.Signature, m.sw.ID(), ConsensusChannel)
                        if err := m.peers.BroadcastMsg(blockSignMsg); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed to broadcast block sign message.")
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
                                return
                        }
 
@@ -159,24 +168,24 @@ func (m *Manager) RemovePeer(peerID string) {
 
 func (m *Manager) Start() error {
        var err error
-       m.proposedBlockSub, err = m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
+       m.blockProposeMsgSub, err = m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
        if err != nil {
                return err
        }
 
-       m.sendBlockSignSub, err = m.eventDispatcher.Subscribe(event.SendBlockSignEvent{})
+       m.BlockSignatureMsgSub, err = m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
        if err != nil {
                return err
        }
 
-       go m.proposedBlockBroadcastLoop()
-       go m.blockSignBroadcastLoop()
+       go m.blockProposeMsgBroadcastLoop()
+       go m.blockSignatureMsgBroadcastLoop()
        return nil
 }
 
-//Stop stop sync manager
+//Stop consensus manager
 func (m *Manager) Stop() {
        close(m.quit)
-       m.proposedBlockSub.Unsubscribe()
-       m.sendBlockSignSub.Unsubscribe()
+       m.blockProposeMsgSub.Unsubscribe()
+       m.BlockSignatureMsgSub.Unsubscribe()
 }