OSDN Git Service

Peer add announces new block message num limit (#378)
[bytom/vapor.git] / netsync / consensusmgr / handle.go
index f3823e0..a9e3a63 100644 (file)
@@ -8,6 +8,7 @@ import (
        "github.com/vapor/event"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
+       "github.com/vapor/p2p/security"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
@@ -15,8 +16,6 @@ import (
 // Switch is the interface for p2p switch.
 type Switch interface {
        AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
-       AddBannedPeer(string) error
-       ID() [32]byte
 }
 
 // Chain is the interface for Bytom core.
@@ -24,7 +23,18 @@ type Chain interface {
        BestBlockHeight() uint64
        GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
        ProcessBlock(*types.Block) (bool, error)
-       ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
+       ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
+}
+
+type Peers interface {
+       AddPeer(peer peers.BasePeer)
+       BroadcastMsg(bm peers.BroadcastMsg) error
+       GetPeer(id string) *peers.Peer
+       MarkBlock(peerID string, hash *bc.Hash)
+       MarkBlockSignature(peerID string, signature []byte)
+       ProcessIllegal(peerID string, level byte, reason string)
+       RemovePeer(peerID string)
+       SetStatus(peerID string, height uint64, hash *bc.Hash)
 }
 
 type blockMsg struct {
@@ -36,7 +46,7 @@ type blockMsg struct {
 type Manager struct {
        sw              Switch
        chain           Chain
-       peers           *peers.PeerSet
+       peers           Peers
        blockFetcher    *blockFetcher
        eventDispatcher *event.Dispatcher
 
@@ -44,9 +54,10 @@ type Manager struct {
 }
 
 // NewManager create new manager.
-func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
+func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
        manager := &Manager{
                sw:              sw,
+               chain:           chain,
                peers:           peers,
                blockFetcher:    newBlockFetcher(chain, peers),
                eventDispatcher: dispatcher,
@@ -67,7 +78,7 @@ func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage)
                return
        }
 
-       logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
+       logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
 
        switch msg := msg.(type) {
        case *BlockProposeMsg:
@@ -91,18 +102,20 @@ func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
        hash := block.Hash()
        m.peers.MarkBlock(peerID, &hash)
        m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
+       m.peers.SetStatus(peerID, block.Height, &hash)
 }
 
 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+       m.peers.MarkBlockSignature(peerID, msg.Signature)
        blockHash := bc.NewHash(msg.BlockHash)
-       if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
-               m.peers.AddBanScore(peerID, 20, 0, err.Error())
+       if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil {
+               m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
                return
        }
 }
 
 func (m *Manager) blockProposeMsgBroadcastLoop() {
-       blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
+       blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
        if err != nil {
                logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
                return
@@ -117,7 +130,7 @@ func (m *Manager) blockProposeMsgBroadcastLoop() {
                                return
                        }
 
-                       ev, ok := obj.Data.(event.NewBlockProposeEvent)
+                       ev, ok := obj.Data.(event.NewProposedBlockEvent)
                        if !ok {
                                logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
                                continue
@@ -160,16 +173,10 @@ func (m *Manager) blockSignatureMsgBroadcastLoop() {
                                continue
                        }
 
-                       blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
-                       if err != nil {
-                               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
-                               return
-                       }
-
-                       blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
+                       blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel)
                        if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
                                logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
-                               return
+                               continue
                        }
 
                case <-m.quit:
@@ -184,6 +191,7 @@ func (m *Manager) removePeer(peerID string) {
 
 //Start consensus manager service.
 func (m *Manager) Start() error {
+       go m.blockFetcher.blockProcessorLoop()
        go m.blockProposeMsgBroadcastLoop()
        go m.blockSignatureMsgBroadcastLoop()
        return nil