6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/errors"
10 "github.com/bytom/p2p/connection"
14 handshakeTimeout = 10 * time.Second
15 handshakeCheckPerid = 500 * time.Millisecond
19 errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
20 errStatusRequest = errors.New("Status request error")
23 //ProtocolReactor handles new coming protocol message.
24 type ProtocolReactor struct {
31 // NewProtocolReactor returns the reactor of whole blockchain.
32 func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
33 pr := &ProtocolReactor{
37 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
41 // GetChannels implements Reactor
42 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
43 return []*connection.ChannelDescriptor{
44 &connection.ChannelDescriptor{
45 ID: BlockchainChannel,
47 SendQueueCapacity: 100,
52 // OnStart implements BaseService
53 func (pr *ProtocolReactor) OnStart() error {
54 pr.BaseReactor.OnStart()
58 // OnStop implements BaseService
59 func (pr *ProtocolReactor) OnStop() {
60 pr.BaseReactor.OnStop()
63 // AddPeer implements Reactor by sending our state to peer.
64 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
65 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
66 return errStatusRequest
69 checkTicker := time.NewTimer(handshakeCheckPerid)
70 timeoutTicker := time.NewTimer(handshakeTimeout)
74 if exist := pr.peers.getPeer(peer.Key); exist != nil {
75 pr.sm.syncTransactions(peer.Key)
79 case <-timeoutTicker.C:
80 return errProtocolHandshakeTimeout
85 // RemovePeer implements Reactor by removing peer from the pool.
86 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
87 pr.peers.removePeer(peer.Key)
90 // Receive implements Reactor by handling 4 types of messages (look below).
91 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
92 msgType, msg, err := DecodeMessage(msgBytes)
94 log.WithField("err", err).Errorf("fail on reactor decoding message")
98 pr.sm.processMsg(src, msgType, msg)