OSDN Git Service

feat: add cross-chain output (#56)
[bytom/vapor.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/errors"
9         "github.com/vapor/p2p"
10         "github.com/vapor/p2p/connection"
11 )
12
13 const (
14         handshakeTimeout    = 10 * time.Second
15         handshakeCheckPerid = 500 * time.Millisecond
16 )
17
18 var (
19         errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
20         errStatusRequest            = errors.New("Status request error")
21 )
22
23 //ProtocolReactor handles new coming protocol message.
24 type ProtocolReactor struct {
25         p2p.BaseReactor
26
27         sm    *SyncManager
28         peers *peerSet
29 }
30
31 // NewProtocolReactor returns the reactor of whole blockchain.
32 func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
33         pr := &ProtocolReactor{
34                 sm:    sm,
35                 peers: peers,
36         }
37         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
38         return pr
39 }
40
41 // GetChannels implements Reactor
42 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
43         return []*connection.ChannelDescriptor{
44                 {
45                         ID:                BlockchainChannel,
46                         Priority:          5,
47                         SendQueueCapacity: 100,
48                 },
49         }
50 }
51
52 // OnStart implements BaseService
53 func (pr *ProtocolReactor) OnStart() error {
54         pr.BaseReactor.OnStart()
55         return nil
56 }
57
58 // OnStop implements BaseService
59 func (pr *ProtocolReactor) OnStop() {
60         pr.BaseReactor.OnStop()
61 }
62
63 // AddPeer implements Reactor by sending our state to peer.
64 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
65         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
66                 return errStatusRequest
67         }
68
69         checkTicker := time.NewTicker(handshakeCheckPerid)
70         defer checkTicker.Stop()
71         timeout := time.NewTimer(handshakeTimeout)
72         defer timeout.Stop()
73         for {
74                 select {
75                 case <-checkTicker.C:
76                         if exist := pr.peers.getPeer(peer.Key); exist != nil {
77                                 pr.sm.syncTransactions(peer.Key)
78                                 return nil
79                         }
80
81                 case <-timeout.C:
82                         return errProtocolHandshakeTimeout
83                 }
84         }
85 }
86
87 // RemovePeer implements Reactor by removing peer from the pool.
88 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
89         pr.peers.removePeer(peer.Key)
90 }
91
92 // Receive implements Reactor by handling 4 types of messages (look below).
93 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
94         msgType, msg, err := DecodeMessage(msgBytes)
95         if err != nil {
96                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
97                 return
98         }
99
100         pr.sm.processMsg(src, msgType, msg)
101 }