OSDN Git Service

d5aed27368ced368e8336208fa1f661bfdf46372
[bytom/vapor.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "bytes"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-wire"
8
9         "github.com/vapor/errors"
10         msgs "github.com/vapor/netsync/messages"
11         "github.com/vapor/p2p"
12         "github.com/vapor/p2p/connection"
13 )
14
15 var (
16         errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
17         errStatusRequest            = errors.New("Status request error")
18 )
19
20 //ProtocolReactor handles new coming protocol message.
21 type ProtocolReactor struct {
22         p2p.BaseReactor
23
24         cm *ChainManager
25 }
26
27 // NewProtocolReactor returns the reactor of whole blockchain.
28 func NewProtocolReactor(cm *ChainManager) *ProtocolReactor {
29         pr := &ProtocolReactor{
30                 cm: cm,
31         }
32         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
33         return pr
34 }
35
36 // GetChannels implements Reactor
37 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
38         return []*connection.ChannelDescriptor{
39                 {
40                         ID:                msgs.BlockchainChannel,
41                         Priority:          5,
42                         SendQueueCapacity: 100,
43                 },
44         }
45 }
46
47 // OnStart implements BaseService
48 func (pr *ProtocolReactor) OnStart() error {
49         pr.BaseReactor.OnStart()
50         return nil
51 }
52
53 // OnStop implements BaseService
54 func (pr *ProtocolReactor) OnStop() {
55         pr.BaseReactor.OnStop()
56 }
57
58 // AddPeer implements Reactor by sending our state to peer.
59 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
60         pr.cm.AddPeer(peer)
61         if err := pr.cm.SendStatus(peer); err != nil {
62                 return err
63         }
64         pr.cm.syncTransactions(peer.Key)
65         return nil
66 }
67
68 // RemovePeer implements Reactor by removing peer from the pool.
69 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
70         pr.cm.RemovePeer(peer.Key)
71 }
72
73 //decodeMessage decode msg
74 func decodeMessage(bz []byte) (msgType byte, msg msgs.BlockchainMessage, err error) {
75         msgType = bz[0]
76         n := int(0)
77         r := bytes.NewReader(bz)
78         msg = wire.ReadBinary(struct{ msgs.BlockchainMessage }{}, r, msgs.MaxBlockchainResponseSize, &n, &err).(struct{ msgs.BlockchainMessage }).BlockchainMessage
79         if err != nil && n != len(bz) {
80                 err = errors.New("DecodeMessage() had bytes left over")
81         }
82         return
83 }
84
85 // Receive implements Reactor by handling 4 types of messages (look below).
86 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
87         msgType, msg, err := decodeMessage(msgBytes)
88         if err != nil {
89                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
90                 return
91         }
92
93         pr.cm.processMsg(src, msgType, msg)
94 }