OSDN Git Service

Fix fast sync pending when all request blocks timeout (#347)
[bytom/vapor.git] / netsync / chainmgr / protocol_reactor.go
1 package chainmgr
2
3 import (
4         "bytes"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-wire"
8
9         "github.com/vapor/errors"
10         msgs "github.com/vapor/netsync/messages"
11         "github.com/vapor/p2p"
12         "github.com/vapor/p2p/connection"
13 )
14
15 //ProtocolReactor handles new coming protocol message.
16 type ProtocolReactor struct {
17         p2p.BaseReactor
18
19         manager *Manager
20 }
21
22 // NewProtocolReactor returns the reactor of whole blockchain.
23 func NewProtocolReactor(manager *Manager) *ProtocolReactor {
24         pr := &ProtocolReactor{
25                 manager: manager,
26         }
27         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
28         return pr
29 }
30
31 // GetChannels implements Reactor
32 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
33         return []*connection.ChannelDescriptor{
34                 {
35                         ID:                msgs.BlockchainChannel,
36                         Priority:          5,
37                         SendQueueCapacity: 100,
38                 },
39         }
40 }
41
42 // OnStart implements BaseService
43 func (pr *ProtocolReactor) OnStart() error {
44         pr.BaseReactor.OnStart()
45         return nil
46 }
47
48 // OnStop implements BaseService
49 func (pr *ProtocolReactor) OnStop() {
50         pr.BaseReactor.OnStop()
51 }
52
53 // AddPeer implements Reactor by sending our state to peer.
54 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
55         pr.manager.AddPeer(peer)
56         if err := pr.manager.SendStatus(peer); err != nil {
57                 return err
58         }
59
60         pr.manager.syncMempool(peer.Key)
61         return nil
62 }
63
64 // RemovePeer implements Reactor by removing peer from the pool.
65 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
66         pr.manager.RemovePeer(peer.Key)
67 }
68
69 //decodeMessage decode msg
70 func decodeMessage(bz []byte) (msgType byte, msg msgs.BlockchainMessage, err error) {
71         msgType = bz[0]
72         n := int(0)
73         r := bytes.NewReader(bz)
74         msg = wire.ReadBinary(struct{ msgs.BlockchainMessage }{}, r, msgs.MaxBlockchainResponseSize, &n, &err).(struct{ msgs.BlockchainMessage }).BlockchainMessage
75         if err != nil && n != len(bz) {
76                 err = errors.New("DecodeMessage() had bytes left over")
77         }
78         return
79 }
80
81 // Receive implements Reactor by handling 4 types of messages (look below).
82 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
83         msgType, msg, err := decodeMessage(msgBytes)
84         if err != nil {
85                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
86                 return
87         }
88
89         pr.manager.processMsg(src, msgType, msg)
90 }