Signature []byte
}
-//NewProposedBlockEvent the proposed block event which needs to broadcast.
-type NewProposedBlockEvent struct{ Block types.Block }
-
-//BlockSignEvent the signature which got from net.
-type BlockSignEvent struct {
- PeerID []byte
- BlockID [32]byte
- Height uint64
- Sign []byte
- Pubkey []byte
-}
-
-//SendBlockSignEvent the signature event which needs to broadcast.
-type SendBlockSignEvent struct {
- BlockID [32]byte
- Height uint64
- Sign []byte
- Pubkey []byte
+//NewBlockProposeEvent block propose event which needs to broadcast.
+type NewBlockProposeEvent struct{ Block types.Block }
+
+//ReceivedBlockSignatureEvent block signature event which received from net.
+type ReceivedBlockSignatureEvent struct {
+ PeerID [32]byte
+ BlockID [32]byte
+ Height uint64
+ Signature []byte
}
// TypeMuxEvent is a time-tagged notification pushed to subscribers.
},
{
checkPoints: []consensus.Checkpoint{
- {10000, bc.Hash{V0: 1}},
+ {Height: 10000, Hash: bc.Hash{V0: 1}},
},
bestHeight: 5000,
- want: &consensus.Checkpoint{10000, bc.Hash{V0: 1}},
+ want: &consensus.Checkpoint{Height: 10000, Hash: bc.Hash{V0: 1}},
},
{
checkPoints: []consensus.Checkpoint{
- {10000, bc.Hash{V0: 1}},
- {20000, bc.Hash{V0: 2}},
- {30000, bc.Hash{V0: 3}},
+ {Height: 10000, Hash: bc.Hash{V0: 1}},
+ {Height: 20000, Hash: bc.Hash{V0: 2}},
+ {Height: 30000, Hash: bc.Hash{V0: 3}},
},
bestHeight: 15000,
- want: &consensus.Checkpoint{20000, bc.Hash{V0: 2}},
+ want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
},
{
checkPoints: []consensus.Checkpoint{
- {10000, bc.Hash{V0: 1}},
- {20000, bc.Hash{V0: 2}},
- {30000, bc.Hash{V0: 3}},
+ {Height: 10000, Hash: bc.Hash{V0: 1}},
+ {Height: 20000, Hash: bc.Hash{V0: 2}},
+ {Height: 30000, Hash: bc.Hash{V0: 3}},
},
bestHeight: 10000,
- want: &consensus.Checkpoint{20000, bc.Hash{V0: 2}},
+ want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}},
},
{
checkPoints: []consensus.Checkpoint{
- {10000, bc.Hash{V0: 1}},
- {20000, bc.Hash{V0: 2}},
- {30000, bc.Hash{V0: 3}},
+ {Height: 10000, Hash: bc.Hash{V0: 1}},
+ {Height: 20000, Hash: bc.Hash{V0: 2}},
+ {Height: 30000, Hash: bc.Hash{V0: 3}},
},
bestHeight: 35000,
want: nil,
msgSet map[bc.Hash]*blockMsg
}
-//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
f := &blockFetcher{
chain: chain,
"module": logModule,
"block height": msg.block.Height,
"block hash": blockHash.String(),
- }).Debug("blockFetcher receive mine block")
+ }).Debug("blockFetcher receive propose block")
}
}
//Consensus msg byte
const (
- BlockSignByte = byte(0x10)
- BlockProposeByte = byte(0x11)
+ BlockSignatureByte = byte(0x10)
+ BlockProposeByte = byte(0x11)
)
//BlockchainMessage is a generic message for this reactor.
var _ = wire.RegisterInterface(
struct{ ConsensusMessage }{},
- wire.ConcreteType{&BlockSignMessage{}, BlockSignByte},
- wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+ wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte},
+ wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte},
)
//decodeMessage decode msg
return
}
-type BlockSignMessage struct {
- BlockID [32]byte
- Height uint64
- Sign []byte
- Pubkey []byte
+type BlockSignatureMsg struct {
+ BlockID [32]byte
+ Height uint64
+ Signature []byte
+ PeerID [32]byte
}
-//NewBlockSignMessage construct new mined block msg
-func NewBlockSignMessage(blockID [32]byte, height uint64, sign []byte, pubkey []byte) *BlockSignMessage {
- return &BlockSignMessage{BlockID: blockID, Height: height, Sign: sign, Pubkey: pubkey}
+//NewBlockSignatureMessage construct new mined block msg
+func NewBlockSignatureMsg(blockID [32]byte, height uint64, signature []byte, peerId [32]byte) *BlockSignatureMsg {
+ return &BlockSignatureMsg{BlockID: blockID, Height: height, Signature: signature, PeerID: peerId}
}
-func (bs *BlockSignMessage) String() string {
- return fmt.Sprintf("{block_hash: %s,block_height: %d,sign:%s,pubkey:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Sign), hex.EncodeToString(bs.Pubkey))
+func (bs *BlockSignatureMsg) String() string {
+ return fmt.Sprintf("{block_hash: %s,block_height: %d,signature:%s,peerID:%s}", hex.EncodeToString(bs.BlockID[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PeerID[:]))
}
-type BlockSignBroadcastMsg struct {
- sign []byte
- msg *BlockSignMessage
+type SignatureBroadcastMsg struct {
+ signature []byte
+ msg *BlockSignatureMsg
transChan byte
}
-func NewBlockSignBroadcastMsg(blockID [32]byte, height uint64, sign []byte, pubkey []byte, transChan byte) *BlockSignBroadcastMsg {
- msg := NewBlockSignMessage(blockID, height, sign, pubkey)
- return &BlockSignBroadcastMsg{sign: sign, msg: msg, transChan: transChan}
+func NewSignatureBroadcastMsg(blockID [32]byte, height uint64, signature []byte, pubkey [32]byte, transChan byte) *SignatureBroadcastMsg {
+ msg := NewBlockSignatureMsg(blockID, height, signature, pubkey)
+ return &SignatureBroadcastMsg{signature: signature, msg: msg, transChan: transChan}
}
-func (m *BlockSignBroadcastMsg) GetChan() byte {
- return m.transChan
+func (s *SignatureBroadcastMsg) GetChan() byte {
+ return s.transChan
}
-func (m *BlockSignBroadcastMsg) GetMsg() interface{} {
- return struct{ ConsensusMessage }{m.msg}
+func (s *SignatureBroadcastMsg) GetMsg() interface{} {
+ return struct{ ConsensusMessage }{s.msg}
}
-func (m *BlockSignBroadcastMsg) MsgString() string {
- return m.msg.String()
+func (s *SignatureBroadcastMsg) MsgString() string {
+ return s.msg.String()
}
-func (m *BlockSignBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+func (s *SignatureBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
for _, peer := range peers {
- ps.MarkBlockSign(peer, m.sign)
+ ps.MarkBlockSignature(peer, s.signature)
}
}
-func (m *BlockSignBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
- //TODO: SPV NODE FILTER
- return ps.PeersWithoutSign(m.sign)
+func (m *SignatureBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+ return ps.PeersWithoutSign(m.signature)
}
-type BlockProposeMessage struct {
+type BlockProposeMsg struct {
RawBlock []byte
}
-//NewBlockProposeMessage construct new mined block msg
-func NewBlockProposeMessage(block *types.Block) (*BlockProposeMessage, error) {
+//NewBlockProposeMsg construct new block propose msg
+func NewBlockProposeMsg(block *types.Block) (*BlockProposeMsg, error) {
rawBlock, err := block.MarshalText()
if err != nil {
return nil, err
}
- return &BlockProposeMessage{RawBlock: rawBlock}, nil
+ return &BlockProposeMsg{RawBlock: rawBlock}, nil
}
-//GetMineBlock get mine block from msg
-func (m *BlockProposeMessage) GetProposeBlock() (*types.Block, error) {
+//GetProposeBlock get propose block from msg
+func (m *BlockProposeMsg) GetProposeBlock() (*types.Block, error) {
block := &types.Block{}
if err := block.UnmarshalText(m.RawBlock); err != nil {
return nil, err
return block, nil
}
-func (bp *BlockProposeMessage) String() string {
+func (bp *BlockProposeMsg) String() string {
block, err := bp.GetProposeBlock()
if err != nil {
return "{err: wrong message}"
return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String())
}
-type BlockProposeBroadcastMsg struct {
+type ProposeBroadcastMsg struct {
block *types.Block
- msg *BlockProposeMessage
+ msg *BlockProposeMsg
transChan byte
}
-func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*BlockProposeBroadcastMsg, error) {
- msg, err := NewBlockProposeMessage(block)
+func NewBlockProposeBroadcastMsg(block *types.Block, transChan byte) (*ProposeBroadcastMsg, error) {
+ msg, err := NewBlockProposeMsg(block)
if err != nil {
return nil, err
}
- return &BlockProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
+ return &ProposeBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
}
-func (m *BlockProposeBroadcastMsg) GetChan() byte {
- return m.transChan
+func (p *ProposeBroadcastMsg) GetChan() byte {
+ return p.transChan
}
-func (m *BlockProposeBroadcastMsg) GetMsg() interface{} {
- return struct{ ConsensusMessage }{m.msg}
+func (p *ProposeBroadcastMsg) GetMsg() interface{} {
+ return struct{ ConsensusMessage }{p.msg}
}
-func (m *BlockProposeBroadcastMsg) MsgString() string {
- return m.msg.String()
+func (p *ProposeBroadcastMsg) MsgString() string {
+ return p.msg.String()
}
-func (m *BlockProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
- hash := m.block.Hash()
- height := m.block.Height
+func (p *ProposeBroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+ hash := p.block.Hash()
+ height := p.block.Height
for _, peer := range peers {
ps.MarkBlock(peer, &hash)
ps.MarkStatus(peer, height)
}
}
-func (m *BlockProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
- //TODO: SPV NODE FILTER
- return ps.PeersWithoutBlock(m.block.Hash())
+func (p *ProposeBroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+ return ps.PeersWithoutBlock(p.block.Hash())
}
"github.com/davecgh/go-spew/spew"
"github.com/tendermint/go-wire"
+
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
var _ = wire.RegisterInterface(
struct{ ConsensusMessage }{},
- wire.ConcreteType{&BlockSignMessage{}, BlockSignByte},
- wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+ wire.ConcreteType{&BlockSignatureMsg{}, BlockSignatureByte},
+ wire.ConcreteType{&BlockProposeMsg{}, BlockProposeByte},
)
func TestDecodeMessage(t *testing.T) {
msgType byte
}{
{
- msg: &BlockSignMessage{
- BlockID: [32]byte{0x01},
- Height: uint64(100),
- Sign: []byte{0x00},
- Pubkey: []byte{0x01},
+ msg: &BlockSignatureMsg{
+ BlockID: [32]byte{0x01},
+ Height: uint64(100),
+ Signature: []byte{0x00},
+ PeerID: [32]byte{0x01},
},
- msgType: BlockSignByte,
+ msgType: BlockSignatureByte,
},
{
- msg: &BlockProposeMessage{
+ msg: &BlockProposeMsg{
RawBlock: []byte{0x01, 0x02},
},
msgType: BlockProposeByte,
}
func TestBlockSignBroadcastMsg(t *testing.T) {
- blockSignMsg := &BlockSignMessage{
- BlockID: [32]byte{0x01},
- Height: uint64(100),
- Sign: []byte{0x00},
- Pubkey: []byte{0x01},
+ blockSignMsg := &BlockSignatureMsg{
+ BlockID: [32]byte{0x01},
+ Height: uint64(100),
+ Signature: []byte{0x00},
+ PeerID: [32]byte{0x01},
}
- blockSignBroadcastMsg := NewBlockSignBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Sign, blockSignMsg.Pubkey, ConsensusChannel)
+ blockSignBroadcastMsg := NewSignatureBroadcastMsg(blockSignMsg.BlockID, blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PeerID, ConsensusChannel)
binMsg := wire.BinaryBytes(blockSignBroadcastMsg.GetMsg())
gotMsgType, gotMsg, err := decodeMessage(binMsg)
if err != nil {
t.Fatalf("decode Message err %s", err)
}
- if gotMsgType != BlockSignByte {
- t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignByte)
+ if gotMsgType != BlockSignatureByte {
+ t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, BlockSignatureByte)
}
if !reflect.DeepEqual(gotMsg, blockSignMsg) {
t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg))
}
func TestBlockProposeBroadcastMsg(t *testing.T) {
- blockProposedmsg, _ := NewBlockProposeMessage(testBlock)
+ blockProposedmsg, _ := NewBlockProposeMsg(testBlock)
BlockProposeBroadcastMsg, _ := NewBlockProposeBroadcastMsg(testBlock, ConsensusChannel)
},
}
-func TestBlockProposeMessage(t *testing.T) {
- blockMsg, err := NewBlockProposeMessage(testBlock)
+func TestBlockProposeMsg(t *testing.T) {
+ blockMsg, err := NewBlockProposeMsg(testBlock)
if err != nil {
t.Fatalf("create new mine block msg err:%s", err)
}
}
}
-func TestBlockSignMessage(t *testing.T) {
- msg := &BlockSignMessage{
- BlockID: [32]byte{0x01},
- Height: uint64(100),
- Sign: []byte{0x00},
- Pubkey: []byte{0x01},
+func TestBlockSignatureMsg(t *testing.T) {
+ msg := &BlockSignatureMsg{
+ BlockID: [32]byte{0x01},
+ Height: uint64(100),
+ Signature: []byte{0x00},
+ PeerID: [32]byte{0x01},
}
- gotMsg := NewBlockSignMessage(msg.BlockID, msg.Height, msg.Sign, msg.Pubkey)
+ gotMsg := NewBlockSignatureMsg(msg.BlockID, msg.Height, msg.Signature, msg.PeerID)
if !reflect.DeepEqual(gotMsg, msg) {
- t.Fatalf("test block sign message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
+ t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
}
- wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,sign:00,pubkey:01}"
+ wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height: 100,signature:00,peerID:0100000000000000000000000000000000000000000000000000000000000000}"
if gotMsg.String() != wantString {
- t.Fatalf("test block sign message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
+ t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
}
}
"github.com/vapor/event"
"github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
+ "github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
blockFetcher *blockFetcher
chain Chain
- quit chan struct{}
+ eventDispatcher *event.Dispatcher
+ blockProposeMsgSub *event.Subscription
+ BlockSignatureMsgSub *event.Subscription
- eventDispatcher *event.Dispatcher
- proposedBlockSub *event.Subscription
- sendBlockSignSub *event.Subscription
+ quit chan struct{}
}
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
AddBannedPeer(string) error
+ ID() [32]byte
}
// Chain is the interface for Bytom core
type Chain interface {
BestBlockHeight() uint64
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
ProcessBlock(*types.Block) (bool, error)
}
sw: sw,
peers: peers,
blockFetcher: newBlockFetcher(chain, peers),
- quit: make(chan struct{}),
eventDispatcher: dispatcher,
+ quit: make(chan struct{}),
}
protocolReactor := NewConsensusReactor(manager)
manager.sw.AddReactor("CONSENSUS", protocolReactor)
log.WithFields(log.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
switch msg := msg.(type) {
- case *BlockProposeMessage:
+ case *BlockProposeMsg:
m.handleBlockProposeMsg(peerID, msg)
- case *BlockSignMessage:
- m.handleBlockSigMsg(peerID, msg)
+ case *BlockSignatureMsg:
+ m.handleBlockSignatureMsg(peerID, msg)
default:
log.WithFields(log.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
}
}
-func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMessage) {
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
block, err := msg.GetProposeBlock()
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
return
}
m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
}
-func (m *Manager) handleBlockSigMsg(peerID string, msg *BlockSignMessage) {
- if err := m.eventDispatcher.Post(event.BlockSignEvent{PeerID: []byte(peerID), BlockID: msg.BlockID, Height: msg.Height, Sign: msg.Sign, Pubkey: msg.Pubkey}); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed post block sign event")
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+ var id [32]byte
+ copy(id[:], peerID)
+ if err := m.eventDispatcher.Post(event.ReceivedBlockSignatureEvent{BlockID: msg.BlockID, Height: msg.Height, Signature: msg.Signature, PeerID: id}); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on post block signature event")
}
}
-func (m *Manager) proposedBlockBroadcastLoop() {
+func (m *Manager) blockProposeMsgBroadcastLoop() {
for {
select {
- case obj, ok := <-m.proposedBlockSub.Chan():
+ case obj, ok := <-m.blockProposeMsgSub.Chan():
if !ok {
- log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
+ log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
return
}
- ev, ok := obj.Data.(event.NewProposedBlockEvent)
+ ev, ok := obj.Data.(event.NewBlockProposeEvent)
if !ok {
log.WithFields(log.Fields{"module": logModule}).Error("event type error")
continue
}
- proposedMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
+ proposeMsg, err := NewBlockProposeBroadcastMsg(&ev.Block, ConsensusChannel)
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on create new propose block msg")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeBroadcastMsg")
return
}
- if err := m.peers.BroadcastMsg(proposedMsg); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+ if err := m.peers.BroadcastMsg(proposeMsg); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
continue
}
}
}
-func (m *Manager) blockSignBroadcastLoop() {
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
for {
select {
- case obj, ok := <-m.sendBlockSignSub.Chan():
+ case obj, ok := <-m.blockProposeMsgSub.Chan():
if !ok {
- log.WithFields(log.Fields{"module": logModule}).Warning("send block sign subscription channel closed")
+ log.WithFields(log.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
return
}
- ev, ok := obj.Data.(event.SendBlockSignEvent)
+ ev, ok := obj.Data.(event.BlockSignatureEvent)
if !ok {
log.WithFields(log.Fields{"module": logModule}).Error("event type error")
continue
}
-
- blockSignMsg := NewBlockSignBroadcastMsg(ev.BlockID, ev.Height, ev.Sign, ev.Pubkey, ConsensusChannel)
+ blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+ return
+ }
+ blockSignMsg := NewSignatureBroadcastMsg(ev.BlockHash.Byte32(), blockHeader.Height, ev.Signature, m.sw.ID(), ConsensusChannel)
if err := m.peers.BroadcastMsg(blockSignMsg); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed to broadcast block sign message.")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
return
}
func (m *Manager) Start() error {
var err error
- m.proposedBlockSub, err = m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
+ m.blockProposeMsgSub, err = m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
if err != nil {
return err
}
- m.sendBlockSignSub, err = m.eventDispatcher.Subscribe(event.SendBlockSignEvent{})
+ m.BlockSignatureMsgSub, err = m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
if err != nil {
return err
}
- go m.proposedBlockBroadcastLoop()
- go m.blockSignBroadcastLoop()
+ go m.blockProposeMsgBroadcastLoop()
+ go m.blockSignatureMsgBroadcastLoop()
return nil
}
-//Stop stop sync manager
+//Stop consensus manager
func (m *Manager) Stop() {
close(m.quit)
- m.proposedBlockSub.Unsubscribe()
- m.sendBlockSignSub.Unsubscribe()
+ m.blockProposeMsgSub.Unsubscribe()
+ m.BlockSignatureMsgSub.Unsubscribe()
}
cr.mgr.RemovePeer(peer.Key)
}
-// Receive implements Reactor by handling 4 types of messages (look below).
+// Receive implements Reactor by handling messages.
func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
msgType, msg, err := decodeMessage(msgBytes)
if err != nil {
Peers() *p2p.PeerSet
}
-//SyncManager Sync Manager is responsible for the business layer information synchronization
+//ChainManager is responsible for the business layer information synchronization
type ChainManager struct {
sw Switch
chain Chain
txMsgSub *event.Subscription
}
-//NewSyncManager create a sync manager
+//NewChainManager create a chain sync manager.
func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
manager := &ChainManager{
sw: sw,
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownSigns = 1024 // Maximum block signs to keep in the known list (prevent DOS)
+ maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
defaultBanThreshold = uint32(100)
maxFilterAddressSize = 50
type Peer struct {
BasePeer
- mtx sync.RWMutex
- services consensus.ServiceFlag
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
- knownSigns *set.Set // Set of block signs known to be known by this peer
- knownStatus uint64 // Set of chain status known to be known by this peer
- filterAdds *set.Set // Set of addresses that the spv node cares about.
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownSignatures *set.Set // Set of block signatures known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
}
func newPeer(basePeer BasePeer) *Peer {
return &Peer{
- BasePeer: basePeer,
- services: basePeer.ServiceFlag(),
- knownTxs: set.New(),
- knownBlocks: set.New(),
- knownSigns: set.New(),
- filterAdds: set.New(),
+ BasePeer: basePeer,
+ services: basePeer.ServiceFlag(),
+ knownTxs: set.New(),
+ knownBlocks: set.New(),
+ knownSignatures: set.New(),
+ filterAdds: set.New(),
}
}
p.knownStatus = height
}
-func (p *Peer) markSign(sign []byte) {
+func (p *Peer) markSign(signature []byte) {
p.mtx.Lock()
defer p.mtx.Unlock()
- for p.knownSigns.Size() >= maxKnownSigns {
- p.knownSigns.Pop()
+ for p.knownSignatures.Size() >= maxKnownSignatures {
+ p.knownSignatures.Pop()
}
- p.knownSigns.Add(sign)
+ p.knownSignatures.Add(signature)
}
func (p *Peer) markTransaction(hash *bc.Hash) {
return peers
}
-func (ps *PeerSet) PeersWithoutSign(sign []byte) []string {
+func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
var peers []string
for _, peer := range ps.peers {
- if !peer.knownSigns.Has(sign) {
+ if !peer.knownSignatures.Has(signature) {
peers = append(peers, peer.ID())
}
}
peer.MarkBlock(hash)
}
-func (ps *PeerSet) MarkBlockSign(peerID string, sign []byte) {
+func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
peer := ps.GetPeer(peerID)
if peer == nil {
return
}
- peer.markSign(sign)
+ peer.markSign(signature)
}
func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
"github.com/vapor/p2p/connection"
)
-var (
- errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
- errStatusRequest = errors.New("Status request error")
-)
-
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
peers *peers.PeerSet
}
-// CreateSyncManager create sync manager and set switch.
+// NewSyncManager create sync manager and set switch.
func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
sw, err := p2p.NewSwitch(config)
if err != nil {
return nil
}
+func (sw *Switch) ID() [32]byte {
+ return sw.nodeInfo.PubKey
+}
+
//IsDialing prevent duplicate dialing
func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.IP.String())