From: Yahtoo Ma Date: Fri, 24 May 2019 05:20:43 +0000 (+0800) Subject: modify parameter name X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=refs%2Fheads%2Fbbft_net;p=bytom%2Fvapor.git modify parameter name --- diff --git a/event/event.go b/event/event.go index 27dcd539..9fe93746 100644 --- a/event/event.go +++ b/event/event.go @@ -32,24 +32,15 @@ type BlockSignatureEvent struct { Signature []byte } -//NewProposedBlockEvent the proposed block event which needs to broadcast. -type NewProposedBlockEvent struct{ Block types.Block } - -//BlockSignEvent the signature which got from net. -type BlockSignEvent struct { - PeerID []byte - BlockID [32]byte - Height uint64 - Sign []byte - Pubkey []byte -} - -//SendBlockSignEvent the signature event which needs to broadcast. -type SendBlockSignEvent struct { - BlockID [32]byte - Height uint64 - Sign []byte - Pubkey []byte +//NewBlockProposeEvent block propose event which needs to broadcast. +type NewBlockProposeEvent struct{ Block types.Block } + +//ReceivedBlockSignatureEvent block signature event which received from net. +type ReceivedBlockSignatureEvent struct { + PeerID [32]byte + BlockID [32]byte + Height uint64 + Signature []byte } // TypeMuxEvent is a time-tagged notification pushed to subscribers. diff --git a/netsync/block_keeper_test.go b/netsync/block_keeper_test.go index de7e2f9c..0cdc60d6 100644 --- a/netsync/block_keeper_test.go +++ b/netsync/block_keeper_test.go @@ -345,34 +345,34 @@ func TestNextCheckpoint(t *testing.T) { }, { checkPoints: []consensus.Checkpoint{ - {10000, bc.Hash{V0: 1}}, + {Height: 10000, Hash: bc.Hash{V0: 1}}, }, bestHeight: 5000, - want: &consensus.Checkpoint{10000, bc.Hash{V0: 1}}, + want: &consensus.Checkpoint{Height: 10000, Hash: bc.Hash{V0: 1}}, }, { checkPoints: []consensus.Checkpoint{ - {10000, bc.Hash{V0: 1}}, - {20000, bc.Hash{V0: 2}}, - {30000, bc.Hash{V0: 3}}, + {Height: 10000, Hash: bc.Hash{V0: 1}}, + {Height: 20000, Hash: bc.Hash{V0: 2}}, + {Height: 30000, Hash: bc.Hash{V0: 3}}, }, bestHeight: 15000, - want: &consensus.Checkpoint{20000, bc.Hash{V0: 2}}, + want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}}, }, { checkPoints: []consensus.Checkpoint{ - {10000, bc.Hash{V0: 1}}, - {20000, bc.Hash{V0: 2}}, - {30000, bc.Hash{V0: 3}}, + {Height: 10000, Hash: bc.Hash{V0: 1}}, + {Height: 20000, Hash: bc.Hash{V0: 2}}, + {Height: 30000, Hash: bc.Hash{V0: 3}}, }, bestHeight: 10000, - want: &consensus.Checkpoint{20000, bc.Hash{V0: 2}}, + want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}}, }, { checkPoints: []consensus.Checkpoint{ - {10000, bc.Hash{V0: 1}}, - {20000, bc.Hash{V0: 2}}, - {30000, bc.Hash{V0: 3}}, + {Height: 10000, Hash: bc.Hash{V0: 1}}, + {Height: 20000, Hash: bc.Hash{V0: 2}}, + {Height: 30000, Hash: bc.Hash{V0: 3}}, }, bestHeight: 35000, want: nil, diff --git a/netsync/consensus/block_fetcher.go b/netsync/consensus/block_fetcher.go index 8dfc7f46..1751afda 100644 --- a/netsync/consensus/block_fetcher.go +++ b/netsync/consensus/block_fetcher.go @@ -25,7 +25,7 @@ type blockFetcher struct { msgSet map[bc.Hash]*blockMsg } -//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined. +//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose. func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher { f := &blockFetcher{ chain: chain, @@ -69,7 +69,7 @@ func (f *blockFetcher) add(msg *blockMsg) { "module": logModule, "block height": msg.block.Height, "block hash": blockHash.String(), - }).Debug("blockFetcher receive mine block") + }).Debug("blockFetcher receive propose block") } } diff --git a/netsync/consensus/consensus_msg.go b/netsync/consensus/consensus_msg.go index 834662f6..e2d7c9ed 100644 --- a/netsync/consensus/consensus_msg.go +++ b/netsync/consensus/consensus_msg.go @@ -14,8 +14,8 @@ import ( //Consensus msg byte const ( - BlockSignByte = byte(0x10) - BlockProposeByte = byte(0x11) + BlockSignatureByte = byte(0x10) + BlockProposeByte = byte(0x11) ) //BlockchainMessage is a generic message for this reactor. @@ -25,8 +25,8 @@ type ConsensusMessage interface { var _ = wire.RegisterInterface( struct{ ConsensusMessage }{}, - wire.ConcreteType{&BlockSignMessage{}, BlockSignByte}, - wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte}, + wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte}, + wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte}, ) //decodeMessage decode msg @@ -41,71 +41,70 @@ func decodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { return } -type BlockSignMessage struct { - BlockID [32]byte - Height uint64 - Sign []byte - Pubkey []byte +type BlockSignatureMsg struct { + BlockID [32]byte + Height uint64 + Signature []byte + PeerID [32]byte } -//NewBlockSignMessage construct new mined block msg -func NewBlockSignMessage(blockID [32]byte, height uint64, sign []byte, pubkey []byte) *BlockSignMessage { - return &BlockSignMessage{BlockID: blockID, Height: height, Sign: sign, Pubkey: pubkey} +//NewBlockSignatureMessage construct new mined block msg +func NewBlockSignatureMsg(blockID [32]byte, height uint64, signature []byte, peerId [32]byte) *BlockSignatureMsg { + return &BlockSignatureMsg{BlockID: blockID, Height: height, Signature: signature, PeerID: peerId} } -func (bs *BlockSignMessage) String() string { - return fmt.Sprintf("{block_hash: %s,block_height: %d,sign:%s,pubkey:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Sign), hex.EncodeToString(bs.Pubkey)) +func (bs *BlockSignatureMsg) String() string { + return fmt.Sprintf("{block_hash: %s,block_height: %d,signature:%s,peerID:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PeerID[:])) } -type BlockSignBroadcastMsg struct { - sign []byte - msg *BlockSignMessage +type SignatureBroadcastMsg struct { + signature []byte + msg *BlockSignatureMsg transChan byte } -func NewBlockSignBroadcastMsg(blockID [32]byte, height uint64, sign []byte, pubkey []byte, transChan byte) *BlockSignBroadcastMsg { - msg := NewBlockSignMessage(blockID, height, sign, pubkey) - return &BlockSignBroadcastMsg{sign: sign, msg: msg, transChan: transChan} +func NewSignatureBroadcastMsg(blockID [32]byte, height uint64, signature []byte, pubkey [32]byte, transChan byte) *SignatureBroadcastMsg { + msg := NewBlockSignatureMsg(blockID, height, signature, pubkey) + return &SignatureBroadcastMsg{signature: signature, msg: msg, transChan: transChan} } -func (m *BlockSignBroadcastMsg) GetChan() byte { - return m.transChan +func (s *SignatureBroadcastMsg) GetChan() byte { + return s.transChan } -func (m *BlockSignBroadcastMsg) GetMsg() interface{} { - return struct{ ConsensusMessage }{m.msg} +func (s *SignatureBroadcastMsg) GetMsg() interface{} { + return struct{ ConsensusMessage }{s.msg} } -func (m *BlockSignBroadcastMsg) MsgString() string { - return m.msg.String() +func (s *SignatureBroadcastMsg) MsgString() string { + return s.msg.String() } -func (m *BlockSignBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) { +func (s *SignatureBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) { for _, peer := range peers { - ps.MarkBlockSign(peer, m.sign) + ps.MarkBlockSignature(peer, s.signature) } } -func (m *BlockSignBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string { - //TODO: SPV NODE FILTER - return ps.PeersWithoutSign(m.sign) +func (m *SignatureBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string { + return ps.PeersWithoutSign(m.signature) } -type BlockProposeMessage struct { +type BlockProposeMsg struct { RawBlock []byte } -//NewBlockProposeMessage construct new mined block msg -func NewBlockProposeMessage(block *types.Block) (*BlockProposeMessage, error) { +//NewBlockProposeMsg construct new block propose msg +func NewBlockProposeMsg(block *types.Block) (*BlockProposeMsg, error) { rawBlock, err := block.MarshalText() if err != nil { return nil, err } - return &BlockProposeMessage{RawBlock: rawBlock}, nil + return &BlockProposeMsg{RawBlock: rawBlock}, nil } -//GetMineBlock get mine block from msg -func (m *BlockProposeMessage) GetProposeBlock() (*types.Block, error) { +//GetProposeBlock get propose block from msg +func (m *BlockProposeMsg) GetProposeBlock() (*types.Block, error) { block := &types.Block{} if err := block.UnmarshalText(m.RawBlock); err != nil { return nil, err @@ -113,7 +112,7 @@ func (m *BlockProposeMessage) GetProposeBlock() (*types.Block, error) { return block, nil } -func (bp *BlockProposeMessage) String() string { +func (bp *BlockProposeMsg) String() string { block, err := bp.GetProposeBlock() if err != nil { return "{err: wrong message}" @@ -122,42 +121,41 @@ func (bp *BlockProposeMessage) String() string { return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String()) } -type BlockProposeBroadcastMsg struct { +type ProposeBroadcastMsg struct { block *types.Block - msg *BlockProposeMessage + msg *BlockProposeMsg transChan byte } -func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*BlockProposeBroadcastMsg, error) { - msg, err := NewBlockProposeMessage(block) +func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*ProposeBroadcastMsg, error) { + msg, err := NewBlockProposeMsg(block) if err != nil { return nil, err } - return &BlockProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil + return &ProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil } -func (m *BlockProposeBroadcastMsg) GetChan() byte { - return m.transChan +func (p *ProposeBroadcastMsg) GetChan() byte { + return p.transChan } -func (m *BlockProposeBroadcastMsg) GetMsg() interface{} { - return struct{ ConsensusMessage }{m.msg} +func (p *ProposeBroadcastMsg) GetMsg() interface{} { + return struct{ ConsensusMessage }{p.msg} } -func (m *BlockProposeBroadcastMsg) MsgString() string { - return m.msg.String() +func (p *ProposeBroadcastMsg) MsgString() string { + return p.msg.String() } -func (m *BlockProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) { - hash := m.block.Hash() - height := m.block.Height +func (p *ProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) { + hash := p.block.Hash() + height := p.block.Height for _, peer := range peers { ps.MarkBlock(peer, &hash) ps.MarkStatus(peer, height) } } -func (m *BlockProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string { - //TODO: SPV NODE FILTER - return ps.PeersWithoutBlock(m.block.Hash()) +func (p *ProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string { + return ps.PeersWithoutBlock(p.block.Hash()) } diff --git a/netsync/consensus/consensus_msg_test.go b/netsync/consensus/consensus_msg_test.go index 60cd89a3..6026e621 100644 --- a/netsync/consensus/consensus_msg_test.go +++ b/netsync/consensus/consensus_msg_test.go @@ -6,14 +6,15 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/tendermint/go-wire" + "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" ) var _ = wire.RegisterInterface( struct{ ConsensusMessage }{}, - wire.ConcreteType{&BlockSignMessage{}, BlockSignByte}, - wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte}, + wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte}, + wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte}, ) func TestDecodeMessage(t *testing.T) { @@ -22,16 +23,16 @@ func TestDecodeMessage(t *testing.T) { msgType byte }{ { - msg: &BlockSignMessage{ - BlockID: [32]byte{0x01}, - Height: uint64(100), - Sign: []byte{0x00}, - Pubkey: []byte{0x01}, + msg: &BlockSignatureMsg{ + BlockID: [32]byte{0x01}, + Height: uint64(100), + Signature: []byte{0x00}, + PeerID: [32]byte{0x01}, }, - msgType: BlockSignByte, + msgType: BlockSignatureByte, }, { - msg: &BlockProposeMessage{ + msg: &BlockProposeMsg{ RawBlock: []byte{0x01, 0x02}, }, msgType: BlockProposeByte, @@ -53,21 +54,21 @@ func TestDecodeMessage(t *testing.T) { } func TestBlockSignBroadcastMsg(t *testing.T) { - blockSignMsg := &BlockSignMessage{ - BlockID: [32]byte{0x01}, - Height: uint64(100), - Sign: []byte{0x00}, - Pubkey: []byte{0x01}, + blockSignMsg := &BlockSignatureMsg{ + BlockID: [32]byte{0x01}, + Height: uint64(100), + Signature: []byte{0x00}, + PeerID: [32]byte{0x01}, } - blockSignBroadcastMsg := NewBlockSignBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Sign, blockSignMsg.Pubkey, ConsensusChannel) + blockSignBroadcastMsg := NewSignatureBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PeerID, ConsensusChannel) binMsg := wire.BinaryBytes(blockSignBroadcastMsg.GetMsg()) gotMsgType, gotMsg, err := decodeMessage(binMsg) if err != nil { t.Fatalf("decode Message err %s", err) } - if gotMsgType != BlockSignByte { - t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignByte) + if gotMsgType != BlockSignatureByte { + t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignatureByte) } if !reflect.DeepEqual(gotMsg, blockSignMsg) { t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg)) @@ -75,7 +76,7 @@ func TestBlockSignBroadcastMsg(t *testing.T) { } func TestBlockProposeBroadcastMsg(t *testing.T) { - blockProposedmsg, _ := NewBlockProposeMessage(testBlock) + blockProposedmsg, _ := NewBlockProposeMsg(testBlock) BlockProposeBroadcastMsg, _ := NewBlockProposeBroadcastMsg(testBlock, ConsensusChannel) @@ -104,8 +105,8 @@ var testBlock = &types.Block{ }, } -func TestBlockProposeMessage(t *testing.T) { - blockMsg, err := NewBlockProposeMessage(testBlock) +func TestBlockProposeMsg(t *testing.T) { + blockMsg, err := NewBlockProposeMsg(testBlock) if err != nil { t.Fatalf("create new mine block msg err:%s", err) } @@ -136,21 +137,21 @@ func TestBlockProposeMessage(t *testing.T) { } } -func TestBlockSignMessage(t *testing.T) { - msg := &BlockSignMessage{ - BlockID: [32]byte{0x01}, - Height: uint64(100), - Sign: []byte{0x00}, - Pubkey: []byte{0x01}, +func TestBlockSignatureMsg(t *testing.T) { + msg := &BlockSignatureMsg{ + BlockID: [32]byte{0x01}, + Height: uint64(100), + Signature: []byte{0x00}, + PeerID: [32]byte{0x01}, } - gotMsg := NewBlockSignMessage(msg.BlockID, msg.Height, msg.Sign, msg.Pubkey) + gotMsg := NewBlockSignatureMsg(msg.BlockID, msg.Height, msg.Signature, msg.PeerID) if !reflect.DeepEqual(gotMsg, msg) { - t.Fatalf("test block sign message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg)) + t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg)) } - wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,sign:00,pubkey:01}" + wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,signature:00,peerID:0100000000000000000000000000000000000000000000000000000000000000}" if gotMsg.String() != wantString { - t.Fatalf("test block sign message err. got string:%s\n want string:%s", gotMsg.String(), wantString) + t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString) } } diff --git a/netsync/consensus/handle.go b/netsync/consensus/handle.go index 2df8958b..9bcce9af 100644 --- a/netsync/consensus/handle.go +++ b/netsync/consensus/handle.go @@ -8,6 +8,7 @@ import ( "github.com/vapor/event" "github.com/vapor/netsync/peers" "github.com/vapor/p2p" + "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" ) @@ -17,21 +18,23 @@ type Manager struct { blockFetcher *blockFetcher chain Chain - quit chan struct{} + eventDispatcher *event.Dispatcher + blockProposeMsgSub *event.Subscription + BlockSignatureMsgSub *event.Subscription - eventDispatcher *event.Dispatcher - proposedBlockSub *event.Subscription - sendBlockSignSub *event.Subscription + quit chan struct{} } type Switch interface { AddReactor(name string, reactor p2p.Reactor) p2p.Reactor AddBannedPeer(string) error + ID() [32]byte } // Chain is the interface for Bytom core type Chain interface { BestBlockHeight() uint64 + GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) ProcessBlock(*types.Block) (bool, error) } @@ -45,8 +48,8 @@ func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *pee sw: sw, peers: peers, blockFetcher: newBlockFetcher(chain, peers), - quit: make(chan struct{}), eventDispatcher: dispatcher, + quit: make(chan struct{}), } protocolReactor := NewConsensusReactor(manager) manager.sw.AddReactor("CONSENSUS", protocolReactor) @@ -66,21 +69,21 @@ func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) log.WithFields(log.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer") switch msg := msg.(type) { - case *BlockProposeMessage: + case *BlockProposeMsg: m.handleBlockProposeMsg(peerID, msg) - case *BlockSignMessage: - m.handleBlockSigMsg(peerID, msg) + case *BlockSignatureMsg: + m.handleBlockSignatureMsg(peerID, msg) default: log.WithFields(log.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type") } } -func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) { +func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) { block, err := msg.GetProposeBlock() if err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock") + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on get propose block") return } @@ -89,34 +92,36 @@ func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block}) } -func (m *Manager) handleBlockSigMsg(peerID string, msg *BlockSignMessage) { - if err := m.eventDispatcher.Post(event.BlockSignEvent{PeerID: []byte(peerID), BlockID: msg.BlockID, Height: msg.Height, Sign: msg.Sign, Pubkey: msg.Pubkey}); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed post block sign event") +func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) { + var id [32]byte + copy(id[:], peerID) + if err := m.eventDispatcher.Post(event.ReceivedBlockSignatureEvent{BlockID: msg.BlockID, Height: msg.Height, Signature: msg.Signature, PeerID: id}); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on post block signature event") } } -func (m *Manager) proposedBlockBroadcastLoop() { +func (m *Manager) blockProposeMsgBroadcastLoop() { for { select { - case obj, ok := <-m.proposedBlockSub.Chan(): + case obj, ok := <-m.blockProposeMsgSub.Chan(): if !ok { - log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed") + log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed") return } - ev, ok := obj.Data.(event.NewProposedBlockEvent) + ev, ok := obj.Data.(event.NewBlockProposeEvent) if !ok { log.WithFields(log.Fields{"module": logModule}).Error("event type error") continue } - proposedMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel) + proposeMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel) if err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on create new propose block msg") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeBroadcastMsg") return } - if err := m.peers.BroadcastMsg(proposedMsg); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block") + if err := m.peers.BroadcastMsg(proposeMsg); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg") continue } @@ -126,24 +131,28 @@ func (m *Manager) proposedBlockBroadcastLoop() { } } -func (m *Manager) blockSignBroadcastLoop() { +func (m *Manager) blockSignatureMsgBroadcastLoop() { for { select { - case obj, ok := <-m.sendBlockSignSub.Chan(): + case obj, ok := <-m.blockProposeMsgSub.Chan(): if !ok { - log.WithFields(log.Fields{"module": logModule}).Warning("send block sign subscription channel closed") + log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed") return } - ev, ok := obj.Data.(event.SendBlockSignEvent) + ev, ok := obj.Data.(event.BlockSignatureEvent) if !ok { log.WithFields(log.Fields{"module": logModule}).Error("event type error") continue } - - blockSignMsg := NewBlockSignBroadcastMsg(ev.BlockID, ev.Height, ev.Sign, ev.Pubkey, ConsensusChannel) + blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.") + return + } + blockSignMsg := NewSignatureBroadcastMsg(ev.BlockHash.Byte32(), blockHeader.Height, ev.Signature, m.sw.ID(), ConsensusChannel) if err := m.peers.BroadcastMsg(blockSignMsg); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed to broadcast block sign message.") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.") return } @@ -159,24 +168,24 @@ func (m *Manager) RemovePeer(peerID string) { func (m *Manager) Start() error { var err error - m.proposedBlockSub, err = m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{}) + m.blockProposeMsgSub, err = m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{}) if err != nil { return err } - m.sendBlockSignSub, err = m.eventDispatcher.Subscribe(event.SendBlockSignEvent{}) + m.BlockSignatureMsgSub, err = m.eventDispatcher.Subscribe(event.BlockSignatureEvent{}) if err != nil { return err } - go m.proposedBlockBroadcastLoop() - go m.blockSignBroadcastLoop() + go m.blockProposeMsgBroadcastLoop() + go m.blockSignatureMsgBroadcastLoop() return nil } -//Stop stop sync manager +//Stop consensus manager func (m *Manager) Stop() { close(m.quit) - m.proposedBlockSub.Unsubscribe() - m.sendBlockSignSub.Unsubscribe() + m.blockProposeMsgSub.Unsubscribe() + m.BlockSignatureMsgSub.Unsubscribe() } diff --git a/netsync/consensus/reactor.go b/netsync/consensus/reactor.go index 464598e6..8dbfb61c 100644 --- a/netsync/consensus/reactor.go +++ b/netsync/consensus/reactor.go @@ -59,7 +59,7 @@ func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { cr.mgr.RemovePeer(peer.Key) } -// Receive implements Reactor by handling 4 types of messages (look below). +// Receive implements Reactor by handling messages. func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { msgType, msg, err := decodeMessage(msgBytes) if err != nil { diff --git a/netsync/handle.go b/netsync/handle.go index d1bb41ee..f5e3c1c9 100644 --- a/netsync/handle.go +++ b/netsync/handle.go @@ -49,7 +49,7 @@ type Switch interface { Peers() *p2p.PeerSet } -//SyncManager Sync Manager is responsible for the business layer information synchronization +//ChainManager is responsible for the business layer information synchronization type ChainManager struct { sw Switch chain Chain @@ -65,7 +65,7 @@ type ChainManager struct { txMsgSub *event.Subscription } -//NewSyncManager create a sync manager +//NewChainManager create a chain sync manager. func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) { manager := &ChainManager{ sw: sw, diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go index 174c597c..ee5068a0 100644 --- a/netsync/peers/peer.go +++ b/netsync/peers/peer.go @@ -20,7 +20,7 @@ import ( const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) - maxKnownSigns = 1024 // Maximum block signs to keep in the known list (prevent DOS) + maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) defaultBanThreshold = uint32(100) maxFilterAddressSize = 50 @@ -75,26 +75,26 @@ type PeerInfo struct { type Peer struct { BasePeer - mtx sync.RWMutex - services consensus.ServiceFlag - height uint64 - hash *bc.Hash - banScore trust.DynamicBanScore - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer - knownSigns *set.Set // Set of block signs known to be known by this peer - knownStatus uint64 // Set of chain status known to be known by this peer - filterAdds *set.Set // Set of addresses that the spv node cares about. + mtx sync.RWMutex + services consensus.ServiceFlag + height uint64 + hash *bc.Hash + banScore trust.DynamicBanScore + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer + knownSignatures *set.Set // Set of block signatures known to be known by this peer + knownStatus uint64 // Set of chain status known to be known by this peer + filterAdds *set.Set // Set of addresses that the spv node cares about. } func newPeer(basePeer BasePeer) *Peer { return &Peer{ - BasePeer: basePeer, - services: basePeer.ServiceFlag(), - knownTxs: set.New(), - knownBlocks: set.New(), - knownSigns: set.New(), - filterAdds: set.New(), + BasePeer: basePeer, + services: basePeer.ServiceFlag(), + knownTxs: set.New(), + knownBlocks: set.New(), + knownSignatures: set.New(), + filterAdds: set.New(), } } @@ -247,14 +247,14 @@ func (p *Peer) markNewStatus(height uint64) { p.knownStatus = height } -func (p *Peer) markSign(sign []byte) { +func (p *Peer) markSign(signature []byte) { p.mtx.Lock() defer p.mtx.Unlock() - for p.knownSigns.Size() >= maxKnownSigns { - p.knownSigns.Pop() + for p.knownSignatures.Size() >= maxKnownSignatures { + p.knownSignatures.Pop() } - p.knownSigns.Add(sign) + p.knownSignatures.Add(signature) } func (p *Peer) markTransaction(hash *bc.Hash) { @@ -280,13 +280,13 @@ func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string { return peers } -func (ps *PeerSet) PeersWithoutSign(sign []byte) []string { +func (ps *PeerSet) PeersWithoutSign(signature []byte) []string { ps.mtx.RLock() defer ps.mtx.RUnlock() var peers []string for _, peer := range ps.peers { - if !peer.knownSigns.Has(sign) { + if !peer.knownSignatures.Has(signature) { peers = append(peers, peer.ID()) } } @@ -570,12 +570,12 @@ func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) { peer.MarkBlock(hash) } -func (ps *PeerSet) MarkBlockSign(peerID string, sign []byte) { +func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) { peer := ps.GetPeer(peerID) if peer == nil { return } - peer.markSign(sign) + peer.markSign(signature) } func (ps *PeerSet) MarkStatus(peerID string, height uint64) { diff --git a/netsync/protocol_reactor.go b/netsync/protocol_reactor.go index d5aed273..f0de2389 100644 --- a/netsync/protocol_reactor.go +++ b/netsync/protocol_reactor.go @@ -12,11 +12,6 @@ import ( "github.com/vapor/p2p/connection" ) -var ( - errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout") - errStatusRequest = errors.New("Status request error") -) - //ProtocolReactor handles new coming protocol message. type ProtocolReactor struct { p2p.BaseReactor diff --git a/netsync/sync_manager.go b/netsync/sync_manager.go index d2116b2b..03c02de3 100644 --- a/netsync/sync_manager.go +++ b/netsync/sync_manager.go @@ -34,7 +34,7 @@ type SyncManager struct { peers *peers.PeerSet } -// CreateSyncManager create sync manager and set switch. +// NewSyncManager create sync manager and set switch. func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { sw, err := p2p.NewSwitch(config) if err != nil { diff --git a/p2p/switch.go b/p2p/switch.go index d70fa75a..57b16cf9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -293,6 +293,10 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { return nil } +func (sw *Switch) ID() [32]byte { + return sw.nodeInfo.PubKey +} + //IsDialing prevent duplicate dialing func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.IP.String())