X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=netsync%2Fconsensusmgr%2Fhandle.go;h=a9e3a631288286ae2dbca81446bc4948dd65fe88;hb=refs%2Fheads%2Fblock_fetcher;hp=66fddd9eeb708dd306093d399c5e83341082862d;hpb=807d99726f6a0610fa9c835e2aabd983801d3510;p=bytom%2Fvapor.git diff --git a/netsync/consensusmgr/handle.go b/netsync/consensusmgr/handle.go index 66fddd9e..a9e3a631 100644 --- a/netsync/consensusmgr/handle.go +++ b/netsync/consensusmgr/handle.go @@ -23,7 +23,18 @@ type Chain interface { BestBlockHeight() uint64 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) ProcessBlock(*types.Block) (bool, error) - ProcessBlockSignature(signature []byte, pubkey [64]byte, blockHash *bc.Hash) error + ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error +} + +type Peers interface { + AddPeer(peer peers.BasePeer) + BroadcastMsg(bm peers.BroadcastMsg) error + GetPeer(id string) *peers.Peer + MarkBlock(peerID string, hash *bc.Hash) + MarkBlockSignature(peerID string, signature []byte) + ProcessIllegal(peerID string, level byte, reason string) + RemovePeer(peerID string) + SetStatus(peerID string, height uint64, hash *bc.Hash) } type blockMsg struct { @@ -35,7 +46,7 @@ type blockMsg struct { type Manager struct { sw Switch chain Chain - peers *peers.PeerSet + peers Peers blockFetcher *blockFetcher eventDispatcher *event.Dispatcher @@ -43,7 +54,7 @@ type Manager struct { } // NewManager create new manager. -func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager { +func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager { manager := &Manager{ sw: sw, chain: chain, @@ -67,7 +78,7 @@ func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) return } - logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer") + logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer") switch msg := msg.(type) { case *BlockProposeMsg: @@ -91,9 +102,11 @@ func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) { hash := block.Hash() m.peers.MarkBlock(peerID, &hash) m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block}) + m.peers.SetStatus(peerID, block.Height, &hash) } func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) { + m.peers.MarkBlockSignature(peerID, msg.Signature) blockHash := bc.NewHash(msg.BlockHash) if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil { m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error()) @@ -160,16 +173,10 @@ func (m *Manager) blockSignatureMsgBroadcastLoop() { continue } - blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash) - if err != nil { - logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.") - return - } - - blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, ev.XPub), consensusChannel) + blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel) if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil { logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.") - return + continue } case <-m.quit: @@ -184,6 +191,7 @@ func (m *Manager) removePeer(peerID string) { //Start consensus manager service. func (m *Manager) Start() error { + go m.blockFetcher.blockProcessorLoop() go m.blockProposeMsgBroadcastLoop() go m.blockSignatureMsgBroadcastLoop() return nil