OSDN Git Service

improve net sync test cases (#375)
[bytom/vapor.git] / netsync / consensusmgr / handle.go
1 package consensusmgr
2
3 import (
4         "reflect"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/vapor/event"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/p2p"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 // Switch is the interface for p2p switch.
17 type Switch interface {
18         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
19 }
20
21 // Chain is the interface for Bytom core.
22 type Chain interface {
23         BestBlockHeight() uint64
24         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
25         ProcessBlock(*types.Block) (bool, error)
26         ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
27 }
28
29 type Peers interface {
30         AddPeer(peer peers.BasePeer)
31         BroadcastMsg(bm peers.BroadcastMsg) error
32         GetPeer(id string) *peers.Peer
33         MarkBlock(peerID string, hash *bc.Hash)
34         MarkBlockSignature(peerID string, signature []byte)
35         ProcessIllegal(peerID string, level byte, reason string)
36         RemovePeer(peerID string)
37         SetStatus(peerID string, height uint64, hash *bc.Hash)
38 }
39
40 type blockMsg struct {
41         block  *types.Block
42         peerID string
43 }
44
45 // Manager is the consensus message network synchronization manager.
46 type Manager struct {
47         sw              Switch
48         chain           Chain
49         peers           Peers
50         blockFetcher    *blockFetcher
51         eventDispatcher *event.Dispatcher
52
53         quit chan struct{}
54 }
55
56 // NewManager create new manager.
57 func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
58         manager := &Manager{
59                 sw:              sw,
60                 chain:           chain,
61                 peers:           peers,
62                 blockFetcher:    newBlockFetcher(chain, peers),
63                 eventDispatcher: dispatcher,
64                 quit:            make(chan struct{}),
65         }
66         protocolReactor := NewConsensusReactor(manager)
67         manager.sw.AddReactor("CONSENSUS", protocolReactor)
68         return manager
69 }
70
71 func (m *Manager) addPeer(peer peers.BasePeer) {
72         m.peers.AddPeer(peer)
73 }
74
75 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
76         peer := m.peers.GetPeer(peerID)
77         if peer == nil {
78                 return
79         }
80
81         logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
82
83         switch msg := msg.(type) {
84         case *BlockProposeMsg:
85                 m.handleBlockProposeMsg(peerID, msg)
86
87         case *BlockSignatureMsg:
88                 m.handleBlockSignatureMsg(peerID, msg)
89
90         default:
91                 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
92         }
93 }
94
95 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
96         block, err := msg.GetProposeBlock()
97         if err != nil {
98                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
99                 return
100         }
101
102         hash := block.Hash()
103         m.peers.MarkBlock(peerID, &hash)
104         m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
105         m.peers.SetStatus(peerID, block.Height, &hash)
106 }
107
108 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
109         m.peers.MarkBlockSignature(peerID, msg.Signature)
110         blockHash := bc.NewHash(msg.BlockHash)
111         if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil {
112                 m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
113                 return
114         }
115 }
116
117 func (m *Manager) blockProposeMsgBroadcastLoop() {
118         blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
119         if err != nil {
120                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
121                 return
122         }
123         defer blockProposeMsgSub.Unsubscribe()
124
125         for {
126                 select {
127                 case obj, ok := <-blockProposeMsgSub.Chan():
128                         if !ok {
129                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
130                                 return
131                         }
132
133                         ev, ok := obj.Data.(event.NewProposedBlockEvent)
134                         if !ok {
135                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
136                                 continue
137                         }
138                         proposeMsg, err := NewBlockProposeMsg(&ev.Block)
139                         if err != nil {
140                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
141                                 return
142                         }
143
144                         if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
145                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
146                                 continue
147                         }
148
149                 case <-m.quit:
150                         return
151                 }
152         }
153 }
154
155 func (m *Manager) blockSignatureMsgBroadcastLoop() {
156         blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
157         if err != nil {
158                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
159                 return
160         }
161         defer blockSignatureMsgSub.Unsubscribe()
162         for {
163                 select {
164                 case obj, ok := <-blockSignatureMsgSub.Chan():
165                         if !ok {
166                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
167                                 return
168                         }
169
170                         ev, ok := obj.Data.(event.BlockSignatureEvent)
171                         if !ok {
172                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
173                                 continue
174                         }
175
176                         blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel)
177                         if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
178                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
179                                 continue
180                         }
181
182                 case <-m.quit:
183                         return
184                 }
185         }
186 }
187
188 func (m *Manager) removePeer(peerID string) {
189         m.peers.RemovePeer(peerID)
190 }
191
192 //Start consensus manager service.
193 func (m *Manager) Start() error {
194         go m.blockProposeMsgBroadcastLoop()
195         go m.blockSignatureMsgBroadcastLoop()
196         return nil
197 }
198
199 //Stop consensus manager service.
200 func (m *Manager) Stop() {
201         close(m.quit)
202 }