OSDN Git Service

modify parameter name bbft_net
authorYahtoo Ma <yahtoo.ma@gmail.com>
Fri, 24 May 2019 05:20:43 +0000 (13:20 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Fri, 24 May 2019 05:20:43 +0000 (13:20 +0800)
12 files changed:
event/event.go
netsync/block_keeper_test.go
netsync/consensus/block_fetcher.go
netsync/consensus/consensus_msg.go
netsync/consensus/consensus_msg_test.go
netsync/consensus/handle.go
netsync/consensus/reactor.go
netsync/handle.go
netsync/peers/peer.go
netsync/protocol_reactor.go
netsync/sync_manager.go
p2p/switch.go

index 27dcd53..9fe9374 100644 (file)
@@ -32,24 +32,15 @@ type BlockSignatureEvent struct {
        Signature []byte
 }
 
-//NewProposedBlockEvent the proposed block event which needs to broadcast.
-type NewProposedBlockEvent struct{ Block types.Block }
-
-//BlockSignEvent the signature which got from net.
-type BlockSignEvent struct {
-       PeerID  []byte
-       BlockID [32]byte
-       Height  uint64
-       Sign    []byte
-       Pubkey  []byte
-}
-
-//SendBlockSignEvent the signature event which needs to broadcast.
-type SendBlockSignEvent struct {
-       BlockID [32]byte
-       Height  uint64
-       Sign    []byte
-       Pubkey  []byte
+//NewBlockProposeEvent block propose event which needs to broadcast.
+type NewBlockProposeEvent struct{ Block types.Block }
+
+//ReceivedBlockSignatureEvent block signature event which received from net.
+type ReceivedBlockSignatureEvent struct {
+       PeerID    [32]byte
+       BlockID   [32]byte
+       Height    uint64
+       Signature []byte
 }
 
 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
index de7e2f9..0cdc60d 100644 (file)
@@ -345,34 +345,34 @@ func TestNextCheckpoint(t *testing.T) {
                },
                {
                        checkPoints: []consensus.Checkpoint{
-                               {10000, bc.Hash{V0: 1}},
+                               {Height: 10000, Hash: bc.Hash{V0: 1}},
                        },
                        bestHeight: 5000,
-                       want:       &consensus.Checkpoint{10000, bc.Hash{V0: 1}},
+                       want:       &consensus.Checkpoint{Height: 10000, Hash: bc.Hash{V0: 1}},
                },
                {
                        checkPoints: []consensus.Checkpoint{
-                               {10000, bc.Hash{V0: 1}},
-                               {20000, bc.Hash{V0: 2}},
-                               {30000, bc.Hash{V0: 3}},
+                               {Height: 10000, Hash: bc.Hash{V0: 1}},
+                               {Height: 20000, Hash: bc.Hash{V0: 2}},
+                               {Height: 30000, Hash: bc.Hash{V0: 3}},
                        },
                        bestHeight: 15000,
-                       want:       &consensus.Checkpoint{20000, bc.Hash{V0: 2}},
+                       want:       &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
                },
                {
                        checkPoints: []consensus.Checkpoint{
-                               {10000, bc.Hash{V0: 1}},
-                               {20000, bc.Hash{V0: 2}},
-                               {30000, bc.Hash{V0: 3}},
+                               {Height: 10000, Hash: bc.Hash{V0: 1}},
+                               {Height: 20000, Hash: bc.Hash{V0: 2}},
+                               {Height: 30000, Hash: bc.Hash{V0: 3}},
                        },
                        bestHeight: 10000,
-                       want:       &consensus.Checkpoint{20000, bc.Hash{V0: 2}},
+                       want:       &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
                },
                {
                        checkPoints: []consensus.Checkpoint{
-                               {10000, bc.Hash{V0: 1}},
-                               {20000, bc.Hash{V0: 2}},
-                               {30000, bc.Hash{V0: 3}},
+                               {Height: 10000, Hash: bc.Hash{V0: 1}},
+                               {Height: 20000, Hash: bc.Hash{V0: 2}},
+                               {Height: 30000, Hash: bc.Hash{V0: 3}},
                        },
                        bestHeight: 35000,
                        want:       nil,
index 8dfc7f4..1751afd 100644 (file)
@@ -25,7 +25,7 @@ type blockFetcher struct {
        msgSet     map[bc.Hash]*blockMsg
 }
 
-//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
 func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
        f := &blockFetcher{
                chain:      chain,
@@ -69,7 +69,7 @@ func (f *blockFetcher) add(msg *blockMsg) {
                        "module":       logModule,
                        "block height": msg.block.Height,
                        "block hash":   blockHash.String(),
-               }).Debug("blockFetcher receive mine block")
+               }).Debug("blockFetcher receive propose block")
        }
 }
 
index 834662f..e2d7c9e 100644 (file)
@@ -14,8 +14,8 @@ import (
 
 //Consensus msg byte
 const (
-       BlockSignByte    = byte(0x10)
-       BlockProposeByte = byte(0x11)
+       BlockSignatureByte = byte(0x10)
+       BlockProposeByte   = byte(0x11)
 )
 
 //BlockchainMessage is a generic message for this reactor.
@@ -25,8 +25,8 @@ type ConsensusMessage interface {
 
 var _ = wire.RegisterInterface(
        struct{ ConsensusMessage }{},
-       wire.ConcreteType{&BlockSignMessage{}, BlockSignByte},
-       wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+       wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte},
+       wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte},
 )
 
 //decodeMessage decode msg
@@ -41,71 +41,70 @@ func decodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
        return
 }
 
-type BlockSignMessage struct {
-       BlockID [32]byte
-       Height  uint64
-       Sign    []byte
-       Pubkey  []byte
+type BlockSignatureMsg struct {
+       BlockID   [32]byte
+       Height    uint64
+       Signature []byte
+       PeerID    [32]byte
 }
 
-//NewBlockSignMessage construct new mined block msg
-func NewBlockSignMessage(blockID [32]byte, height uint64, sign []byte, pubkey []byte) *BlockSignMessage {
-       return &BlockSignMessage{BlockID: blockID, Height: height, Sign: sign, Pubkey: pubkey}
+//NewBlockSignatureMessage construct new mined block msg
+func NewBlockSignatureMsg(blockID [32]byte, height uint64, signature []byte, peerId [32]byte) *BlockSignatureMsg {
+       return &BlockSignatureMsg{BlockID: blockID, Height: height, Signature: signature, PeerID: peerId}
 }
 
-func (bs *BlockSignMessage) String() string {
-       return fmt.Sprintf("{block_hash: %s,block_height: %d,sign:%s,pubkey:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Sign), hex.EncodeToString(bs.Pubkey))
+func (bs *BlockSignatureMsg) String() string {
+       return fmt.Sprintf("{block_hash: %s,block_height: %d,signature:%s,peerID:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PeerID[:]))
 }
 
-type BlockSignBroadcastMsg struct {
-       sign      []byte
-       msg       *BlockSignMessage
+type SignatureBroadcastMsg struct {
+       signature []byte
+       msg       *BlockSignatureMsg
        transChan byte
 }
 
-func NewBlockSignBroadcastMsg(blockID [32]byte, height uint64, sign []byte, pubkey []byte, transChan byte) *BlockSignBroadcastMsg {
-       msg := NewBlockSignMessage(blockID, height, sign, pubkey)
-       return &BlockSignBroadcastMsg{sign: sign, msg: msg, transChan: transChan}
+func NewSignatureBroadcastMsg(blockID [32]byte, height uint64, signature []byte, pubkey [32]byte, transChan byte) *SignatureBroadcastMsg {
+       msg := NewBlockSignatureMsg(blockID, height, signature, pubkey)
+       return &SignatureBroadcastMsg{signature: signature, msg: msg, transChan: transChan}
 }
 
-func (m *BlockSignBroadcastMsg) GetChan() byte {
-       return m.transChan
+func (s *SignatureBroadcastMsg) GetChan() byte {
+       return s.transChan
 }
 
-func (m *BlockSignBroadcastMsg) GetMsg() interface{} {
-       return struct{ ConsensusMessage }{m.msg}
+func (s *SignatureBroadcastMsg) GetMsg() interface{} {
+       return struct{ ConsensusMessage }{s.msg}
 }
 
-func (m *BlockSignBroadcastMsg) MsgString() string {
-       return m.msg.String()
+func (s *SignatureBroadcastMsg) MsgString() string {
+       return s.msg.String()
 }
 
-func (m *BlockSignBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+func (s *SignatureBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
        for _, peer := range peers {
-               ps.MarkBlockSign(peer, m.sign)
+               ps.MarkBlockSignature(peer, s.signature)
        }
 }
 
-func (m *BlockSignBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
-       //TODO: SPV NODE FILTER
-       return ps.PeersWithoutSign(m.sign)
+func (m *SignatureBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+       return ps.PeersWithoutSign(m.signature)
 }
 
-type BlockProposeMessage struct {
+type BlockProposeMsg struct {
        RawBlock []byte
 }
 
-//NewBlockProposeMessage construct new mined block msg
-func NewBlockProposeMessage(block *types.Block) (*BlockProposeMessage, error) {
+//NewBlockProposeMsg construct new block propose msg
+func NewBlockProposeMsg(block *types.Block) (*BlockProposeMsg, error) {
        rawBlock, err := block.MarshalText()
        if err != nil {
                return nil, err
        }
-       return &BlockProposeMessage{RawBlock: rawBlock}, nil
+       return &BlockProposeMsg{RawBlock: rawBlock}, nil
 }
 
-//GetMineBlock get mine block from msg
-func (m *BlockProposeMessage) GetProposeBlock() (*types.Block, error) {
+//GetProposeBlock get propose block from msg
+func (m *BlockProposeMsg) GetProposeBlock() (*types.Block, error) {
        block := &types.Block{}
        if err := block.UnmarshalText(m.RawBlock); err != nil {
                return nil, err
@@ -113,7 +112,7 @@ func (m *BlockProposeMessage) GetProposeBlock() (*types.Block, error) {
        return block, nil
 }
 
-func (bp *BlockProposeMessage) String() string {
+func (bp *BlockProposeMsg) String() string {
        block, err := bp.GetProposeBlock()
        if err != nil {
                return "{err: wrong message}"
@@ -122,42 +121,41 @@ func (bp *BlockProposeMessage) String() string {
        return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String())
 }
 
-type BlockProposeBroadcastMsg struct {
+type ProposeBroadcastMsg struct {
        block     *types.Block
-       msg       *BlockProposeMessage
+       msg       *BlockProposeMsg
        transChan byte
 }
 
-func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*BlockProposeBroadcastMsg, error) {
-       msg, err := NewBlockProposeMessage(block)
+func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*ProposeBroadcastMsg, error) {
+       msg, err := NewBlockProposeMsg(block)
        if err != nil {
                return nil, err
        }
-       return &BlockProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
+       return &ProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
 }
 
-func (m *BlockProposeBroadcastMsg) GetChan() byte {
-       return m.transChan
+func (p *ProposeBroadcastMsg) GetChan() byte {
+       return p.transChan
 }
 
-func (m *BlockProposeBroadcastMsg) GetMsg() interface{} {
-       return struct{ ConsensusMessage }{m.msg}
+func (p *ProposeBroadcastMsg) GetMsg() interface{} {
+       return struct{ ConsensusMessage }{p.msg}
 }
 
-func (m *BlockProposeBroadcastMsg) MsgString() string {
-       return m.msg.String()
+func (p *ProposeBroadcastMsg) MsgString() string {
+       return p.msg.String()
 }
 
-func (m *BlockProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
-       hash := m.block.Hash()
-       height := m.block.Height
+func (p *ProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+       hash := p.block.Hash()
+       height := p.block.Height
        for _, peer := range peers {
                ps.MarkBlock(peer, &hash)
                ps.MarkStatus(peer, height)
        }
 }
 
-func (m *BlockProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
-       //TODO: SPV NODE FILTER
-       return ps.PeersWithoutBlock(m.block.Hash())
+func (p *ProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+       return ps.PeersWithoutBlock(p.block.Hash())
 }
index 60cd89a..6026e62 100644 (file)
@@ -6,14 +6,15 @@ import (
 
        "github.com/davecgh/go-spew/spew"
        "github.com/tendermint/go-wire"
+
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
 var _ = wire.RegisterInterface(
        struct{ ConsensusMessage }{},
-       wire.ConcreteType{&BlockSignMessage{}, BlockSignByte},
-       wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+       wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte},
+       wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte},
 )
 
 func TestDecodeMessage(t *testing.T) {
@@ -22,16 +23,16 @@ func TestDecodeMessage(t *testing.T) {
                msgType byte
        }{
                {
-                       msg: &BlockSignMessage{
-                               BlockID: [32]byte{0x01},
-                               Height:  uint64(100),
-                               Sign:    []byte{0x00},
-                               Pubkey:  []byte{0x01},
+                       msg: &BlockSignatureMsg{
+                               BlockID:   [32]byte{0x01},
+                               Height:    uint64(100),
+                               Signature: []byte{0x00},
+                               PeerID:    [32]byte{0x01},
                        },
-                       msgType: BlockSignByte,
+                       msgType: BlockSignatureByte,
                },
                {
-                       msg: &BlockProposeMessage{
+                       msg: &BlockProposeMsg{
                                RawBlock: []byte{0x01, 0x02},
                        },
                        msgType: BlockProposeByte,
@@ -53,21 +54,21 @@ func TestDecodeMessage(t *testing.T) {
 }
 
 func TestBlockSignBroadcastMsg(t *testing.T) {
-       blockSignMsg := &BlockSignMessage{
-               BlockID: [32]byte{0x01},
-               Height:  uint64(100),
-               Sign:    []byte{0x00},
-               Pubkey:  []byte{0x01},
+       blockSignMsg := &BlockSignatureMsg{
+               BlockID:   [32]byte{0x01},
+               Height:    uint64(100),
+               Signature: []byte{0x00},
+               PeerID:    [32]byte{0x01},
        }
-       blockSignBroadcastMsg := NewBlockSignBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Sign, blockSignMsg.Pubkey, ConsensusChannel)
+       blockSignBroadcastMsg := NewSignatureBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PeerID, ConsensusChannel)
 
        binMsg := wire.BinaryBytes(blockSignBroadcastMsg.GetMsg())
        gotMsgType, gotMsg, err := decodeMessage(binMsg)
        if err != nil {
                t.Fatalf("decode Message err %s", err)
        }
-       if gotMsgType != BlockSignByte {
-               t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignByte)
+       if gotMsgType != BlockSignatureByte {
+               t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignatureByte)
        }
        if !reflect.DeepEqual(gotMsg, blockSignMsg) {
                t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg))
@@ -75,7 +76,7 @@ func TestBlockSignBroadcastMsg(t *testing.T) {
 }
 
 func TestBlockProposeBroadcastMsg(t *testing.T) {
-       blockProposedmsg, _ := NewBlockProposeMessage(testBlock)
+       blockProposedmsg, _ := NewBlockProposeMsg(testBlock)
 
        BlockProposeBroadcastMsg, _ := NewBlockProposeBroadcastMsg(testBlock, ConsensusChannel)
 
@@ -104,8 +105,8 @@ var testBlock = &types.Block{
        },
 }
 
-func TestBlockProposeMessage(t *testing.T) {
-       blockMsg, err := NewBlockProposeMessage(testBlock)
+func TestBlockProposeMsg(t *testing.T) {
+       blockMsg, err := NewBlockProposeMsg(testBlock)
        if err != nil {
                t.Fatalf("create new mine block msg err:%s", err)
        }
@@ -136,21 +137,21 @@ func TestBlockProposeMessage(t *testing.T) {
        }
 }
 
-func TestBlockSignMessage(t *testing.T) {
-       msg := &BlockSignMessage{
-               BlockID: [32]byte{0x01},
-               Height:  uint64(100),
-               Sign:    []byte{0x00},
-               Pubkey:  []byte{0x01},
+func TestBlockSignatureMsg(t *testing.T) {
+       msg := &BlockSignatureMsg{
+               BlockID:   [32]byte{0x01},
+               Height:    uint64(100),
+               Signature: []byte{0x00},
+               PeerID:    [32]byte{0x01},
        }
 
-       gotMsg := NewBlockSignMessage(msg.BlockID, msg.Height, msg.Sign, msg.Pubkey)
+       gotMsg := NewBlockSignatureMsg(msg.BlockID, msg.Height, msg.Signature, msg.PeerID)
 
        if !reflect.DeepEqual(gotMsg, msg) {
-               t.Fatalf("test block sign message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
+               t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
        }
-       wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,sign:00,pubkey:01}"
+       wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,signature:00,peerID:0100000000000000000000000000000000000000000000000000000000000000}"
        if gotMsg.String() != wantString {
-               t.Fatalf("test block sign message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
+               t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
        }
 }
index 2df8958..9bcce9a 100644 (file)
@@ -8,6 +8,7 @@ import (
        "github.com/vapor/event"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
+       "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
@@ -17,21 +18,23 @@ type Manager struct {
        blockFetcher *blockFetcher
        chain        Chain
 
-       quit chan struct{}
+       eventDispatcher      *event.Dispatcher
+       blockProposeMsgSub   *event.Subscription
+       BlockSignatureMsgSub *event.Subscription
 
-       eventDispatcher  *event.Dispatcher
-       proposedBlockSub *event.Subscription
-       sendBlockSignSub *event.Subscription
+       quit chan struct{}
 }
 
 type Switch interface {
        AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
        AddBannedPeer(string) error
+       ID() [32]byte
 }
 
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeight() uint64
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
        ProcessBlock(*types.Block) (bool, error)
 }
 
@@ -45,8 +48,8 @@ func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *pee
                sw:              sw,
                peers:           peers,
                blockFetcher:    newBlockFetcher(chain, peers),
-               quit:            make(chan struct{}),
                eventDispatcher: dispatcher,
+               quit:            make(chan struct{}),
        }
        protocolReactor := NewConsensusReactor(manager)
        manager.sw.AddReactor("CONSENSUS", protocolReactor)
@@ -66,21 +69,21 @@ func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage)
        log.WithFields(log.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
 
        switch msg := msg.(type) {
-       case *BlockProposeMessage:
+       case *BlockProposeMsg:
                m.handleBlockProposeMsg(peerID, msg)
 
-       case *BlockSignMessage:
-               m.handleBlockSigMsg(peerID, msg)
+       case *BlockSignatureMsg:
+               m.handleBlockSignatureMsg(peerID, msg)
 
        default:
                log.WithFields(log.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
        }
 }
 
-func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) {
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
        block, err := msg.GetProposeBlock()
        if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
                return
        }
 
@@ -89,34 +92,36 @@ func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage)
        m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
 }
 
-func (m *Manager) handleBlockSigMsg(peerID string, msg *BlockSignMessage) {
-       if err := m.eventDispatcher.Post(event.BlockSignEvent{PeerID: []byte(peerID), BlockID: msg.BlockID, Height: msg.Height, Sign: msg.Sign, Pubkey: msg.Pubkey}); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed post block sign event")
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+       var id [32]byte
+       copy(id[:], peerID)
+       if err := m.eventDispatcher.Post(event.ReceivedBlockSignatureEvent{BlockID: msg.BlockID, Height: msg.Height, Signature: msg.Signature, PeerID: id}); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on post block signature event")
        }
 }
 
-func (m *Manager) proposedBlockBroadcastLoop() {
+func (m *Manager) blockProposeMsgBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-m.proposedBlockSub.Chan():
+               case obj, ok := <-m.blockProposeMsgSub.Chan():
                        if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
+                               log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
                                return
                        }
 
-                       ev, ok := obj.Data.(event.NewProposedBlockEvent)
+                       ev, ok := obj.Data.(event.NewBlockProposeEvent)
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Error("event type error")
                                continue
                        }
 
-                       proposedMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
+                       proposeMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
                        if err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on create new propose block msg")
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeBroadcastMsg")
                                return
                        }
-                       if err := m.peers.BroadcastMsg(proposedMsg); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+                       if err := m.peers.BroadcastMsg(proposeMsg); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
                                continue
                        }
 
@@ -126,24 +131,28 @@ func (m *Manager) proposedBlockBroadcastLoop() {
        }
 }
 
-func (m *Manager) blockSignBroadcastLoop() {
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-m.sendBlockSignSub.Chan():
+               case obj, ok := <-m.blockProposeMsgSub.Chan():
                        if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Warning("send block sign subscription channel closed")
+                               log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
                                return
                        }
 
-                       ev, ok := obj.Data.(event.SendBlockSignEvent)
+                       ev, ok := obj.Data.(event.BlockSignatureEvent)
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Error("event type error")
                                continue
                        }
-
-                       blockSignMsg := NewBlockSignBroadcastMsg(ev.BlockID, ev.Height, ev.Sign, ev.Pubkey, ConsensusChannel)
+                       blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+                               return
+                       }
+                       blockSignMsg := NewSignatureBroadcastMsg(ev.BlockHash.Byte32(), blockHeader.Height, ev.Signature, m.sw.ID(), ConsensusChannel)
                        if err := m.peers.BroadcastMsg(blockSignMsg); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed to broadcast block sign message.")
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
                                return
                        }
 
@@ -159,24 +168,24 @@ func (m *Manager) RemovePeer(peerID string) {
 
 func (m *Manager) Start() error {
        var err error
-       m.proposedBlockSub, err = m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
+       m.blockProposeMsgSub, err = m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
        if err != nil {
                return err
        }
 
-       m.sendBlockSignSub, err = m.eventDispatcher.Subscribe(event.SendBlockSignEvent{})
+       m.BlockSignatureMsgSub, err = m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
        if err != nil {
                return err
        }
 
-       go m.proposedBlockBroadcastLoop()
-       go m.blockSignBroadcastLoop()
+       go m.blockProposeMsgBroadcastLoop()
+       go m.blockSignatureMsgBroadcastLoop()
        return nil
 }
 
-//Stop stop sync manager
+//Stop consensus manager
 func (m *Manager) Stop() {
        close(m.quit)
-       m.proposedBlockSub.Unsubscribe()
-       m.sendBlockSignSub.Unsubscribe()
+       m.blockProposeMsgSub.Unsubscribe()
+       m.BlockSignatureMsgSub.Unsubscribe()
 }
index 464598e..8dbfb61 100644 (file)
@@ -59,7 +59,7 @@ func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
        cr.mgr.RemovePeer(peer.Key)
 }
 
-// Receive implements Reactor by handling 4 types of messages (look below).
+// Receive implements Reactor by handling messages.
 func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
        msgType, msg, err := decodeMessage(msgBytes)
        if err != nil {
index d1bb41e..f5e3c1c 100644 (file)
@@ -49,7 +49,7 @@ type Switch interface {
        Peers() *p2p.PeerSet
 }
 
-//SyncManager Sync Manager is responsible for the business layer information synchronization
+//ChainManager is responsible for the business layer information synchronization
 type ChainManager struct {
        sw          Switch
        chain       Chain
@@ -65,7 +65,7 @@ type ChainManager struct {
        txMsgSub        *event.Subscription
 }
 
-//NewSyncManager create a sync manager
+//NewChainManager create a chain sync manager.
 func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
        manager := &ChainManager{
                sw:              sw,
index 174c597..ee5068a 100644 (file)
@@ -20,7 +20,7 @@ import (
 
 const (
        maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
-       maxKnownSigns         = 1024  // Maximum block signs to keep in the known list (prevent DOS)
+       maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
        maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
        defaultBanThreshold   = uint32(100)
        maxFilterAddressSize  = 50
@@ -75,26 +75,26 @@ type PeerInfo struct {
 
 type Peer struct {
        BasePeer
-       mtx         sync.RWMutex
-       services    consensus.ServiceFlag
-       height      uint64
-       hash        *bc.Hash
-       banScore    trust.DynamicBanScore
-       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks *set.Set // Set of block hashes known to be known by this peer
-       knownSigns  *set.Set // Set of block signs known to be known by this peer
-       knownStatus uint64   // Set of chain status known to be known by this peer
-       filterAdds  *set.Set // Set of addresses that the spv node cares about.
+       mtx             sync.RWMutex
+       services        consensus.ServiceFlag
+       height          uint64
+       hash            *bc.Hash
+       banScore        trust.DynamicBanScore
+       knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks     *set.Set // Set of block hashes known to be known by this peer
+       knownSignatures *set.Set // Set of block signatures known to be known by this peer
+       knownStatus     uint64   // Set of chain status known to be known by this peer
+       filterAdds      *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(basePeer BasePeer) *Peer {
        return &Peer{
-               BasePeer:    basePeer,
-               services:    basePeer.ServiceFlag(),
-               knownTxs:    set.New(),
-               knownBlocks: set.New(),
-               knownSigns:  set.New(),
-               filterAdds:  set.New(),
+               BasePeer:        basePeer,
+               services:        basePeer.ServiceFlag(),
+               knownTxs:        set.New(),
+               knownBlocks:     set.New(),
+               knownSignatures: set.New(),
+               filterAdds:      set.New(),
        }
 }
 
@@ -247,14 +247,14 @@ func (p *Peer) markNewStatus(height uint64) {
        p.knownStatus = height
 }
 
-func (p *Peer) markSign(sign []byte) {
+func (p *Peer) markSign(signature []byte) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
-       for p.knownSigns.Size() >= maxKnownSigns {
-               p.knownSigns.Pop()
+       for p.knownSignatures.Size() >= maxKnownSignatures {
+               p.knownSignatures.Pop()
        }
-       p.knownSigns.Add(sign)
+       p.knownSignatures.Add(signature)
 }
 
 func (p *Peer) markTransaction(hash *bc.Hash) {
@@ -280,13 +280,13 @@ func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
        return peers
 }
 
-func (ps *PeerSet) PeersWithoutSign(sign []byte) []string {
+func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
        var peers []string
        for _, peer := range ps.peers {
-               if !peer.knownSigns.Has(sign) {
+               if !peer.knownSignatures.Has(signature) {
                        peers = append(peers, peer.ID())
                }
        }
@@ -570,12 +570,12 @@ func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
        peer.MarkBlock(hash)
 }
 
-func (ps *PeerSet) MarkBlockSign(peerID string, sign []byte) {
+func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
        peer := ps.GetPeer(peerID)
        if peer == nil {
                return
        }
-       peer.markSign(sign)
+       peer.markSign(signature)
 }
 
 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
index d5aed27..f0de238 100644 (file)
@@ -12,11 +12,6 @@ import (
        "github.com/vapor/p2p/connection"
 )
 
-var (
-       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
-       errStatusRequest            = errors.New("Status request error")
-)
-
 //ProtocolReactor handles new coming protocol message.
 type ProtocolReactor struct {
        p2p.BaseReactor
index d2116b2..03c02de 100644 (file)
@@ -34,7 +34,7 @@ type SyncManager struct {
        peers        *peers.PeerSet
 }
 
-// CreateSyncManager create sync manager and set switch.
+// NewSyncManager create sync manager and set switch.
 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        sw, err := p2p.NewSwitch(config)
        if err != nil {
index d70fa75..57b16cf 100644 (file)
@@ -293,6 +293,10 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
        return nil
 }
 
+func (sw *Switch) ID() [32]byte {
+       return sw.nodeInfo.PubKey
+}
+
 //IsDialing prevent duplicate dialing
 func (sw *Switch) IsDialing(addr *NetAddress) bool {
        return sw.dialing.Has(addr.IP.String())