6 log "github.com/sirupsen/logrus"
7 "github.com/tendermint/go-wire"
9 "github.com/vapor/errors"
10 msgs "github.com/vapor/netsync/messages"
11 "github.com/vapor/p2p"
12 "github.com/vapor/p2p/connection"
15 //ProtocolReactor handles new coming protocol message.
16 type ProtocolReactor struct {
22 // NewProtocolReactor returns the reactor of whole blockchain.
23 func NewProtocolReactor(manager *Manager) *ProtocolReactor {
24 pr := &ProtocolReactor{
27 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
31 // GetChannels implements Reactor
32 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
33 return []*connection.ChannelDescriptor{
35 ID: msgs.BlockchainChannel,
37 SendQueueCapacity: 100,
42 // OnStart implements BaseService
43 func (pr *ProtocolReactor) OnStart() error {
44 pr.BaseReactor.OnStart()
48 // OnStop implements BaseService
49 func (pr *ProtocolReactor) OnStop() {
50 pr.BaseReactor.OnStop()
53 // AddPeer implements Reactor by sending our state to peer.
54 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
55 pr.manager.AddPeer(peer)
56 if err := pr.manager.SendStatus(peer); err != nil {
59 pr.manager.syncMempool(peer.Key)
63 // RemovePeer implements Reactor by removing peer from the pool.
64 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
65 pr.manager.RemovePeer(peer.Key)
68 //decodeMessage decode msg
69 func decodeMessage(bz []byte) (msgType byte, msg msgs.BlockchainMessage, err error) {
72 r := bytes.NewReader(bz)
73 msg = wire.ReadBinary(struct{ msgs.BlockchainMessage }{}, r, msgs.MaxBlockchainResponseSize, &n, &err).(struct{ msgs.BlockchainMessage }).BlockchainMessage
74 if err != nil && n != len(bz) {
75 err = errors.New("DecodeMessage() had bytes left over")
80 // Receive implements Reactor by handling 4 types of messages (look below).
81 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
82 msgType, msg, err := decodeMessage(msgBytes)
84 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
88 pr.manager.processMsg(src, msgType, msg)