OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / netsync / consensusmgr / handle.go
1 package consensusmgr
2
3 import (
4         "reflect"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/vapor/event"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/p2p"
11         "github.com/vapor/protocol/bc"
12         "github.com/vapor/protocol/bc/types"
13 )
14
15 // Switch is the interface for p2p switch.
16 type Switch interface {
17         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
18         AddBannedPeer(string) error
19         ID() [32]byte
20 }
21
22 // Chain is the interface for Bytom core.
23 type Chain interface {
24         BestBlockHeight() uint64
25         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
26         ProcessBlock(*types.Block) (bool, error)
27         ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
28 }
29
30 type blockMsg struct {
31         block  *types.Block
32         peerID string
33 }
34
35 // Manager is the consensus message network synchronization manager.
36 type Manager struct {
37         sw              Switch
38         chain           Chain
39         peers           *peers.PeerSet
40         blockFetcher    *blockFetcher
41         eventDispatcher *event.Dispatcher
42
43         quit chan struct{}
44 }
45
46 // NewManager create new manager.
47 func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
48         manager := &Manager{
49                 sw:              sw,
50                 peers:           peers,
51                 blockFetcher:    newBlockFetcher(chain, peers),
52                 eventDispatcher: dispatcher,
53                 quit:            make(chan struct{}),
54         }
55         protocolReactor := NewConsensusReactor(manager)
56         manager.sw.AddReactor("CONSENSUS", protocolReactor)
57         return manager
58 }
59
60 func (m *Manager) addPeer(peer peers.BasePeer) {
61         m.peers.AddPeer(peer)
62 }
63
64 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
65         peer := m.peers.GetPeer(peerID)
66         if peer == nil {
67                 return
68         }
69
70         logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
71
72         switch msg := msg.(type) {
73         case *BlockProposeMsg:
74                 m.handleBlockProposeMsg(peerID, msg)
75
76         case *BlockSignatureMsg:
77                 m.handleBlockSignatureMsg(peerID, msg)
78
79         default:
80                 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
81         }
82 }
83
84 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
85         block, err := msg.GetProposeBlock()
86         if err != nil {
87                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
88                 return
89         }
90
91         hash := block.Hash()
92         m.peers.MarkBlock(peerID, &hash)
93         m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
94 }
95
96 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
97         blockHash := bc.NewHash(msg.BlockHash)
98         if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
99                 m.peers.AddBanScore(peerID, 20, 0, err.Error())
100                 return
101         }
102 }
103
104 func (m *Manager) blockProposeMsgBroadcastLoop() {
105         blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
106         if err != nil {
107                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
108                 return
109         }
110         defer blockProposeMsgSub.Unsubscribe()
111
112         for {
113                 select {
114                 case obj, ok := <-blockProposeMsgSub.Chan():
115                         if !ok {
116                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
117                                 return
118                         }
119
120                         ev, ok := obj.Data.(event.NewBlockProposeEvent)
121                         if !ok {
122                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
123                                 continue
124                         }
125                         proposeMsg, err := NewBlockProposeMsg(&ev.Block)
126                         if err != nil {
127                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
128                                 return
129                         }
130
131                         if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
132                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
133                                 continue
134                         }
135
136                 case <-m.quit:
137                         return
138                 }
139         }
140 }
141
142 func (m *Manager) blockSignatureMsgBroadcastLoop() {
143         blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
144         if err != nil {
145                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
146                 return
147         }
148         defer blockSignatureMsgSub.Unsubscribe()
149         for {
150                 select {
151                 case obj, ok := <-blockSignatureMsgSub.Chan():
152                         if !ok {
153                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
154                                 return
155                         }
156
157                         ev, ok := obj.Data.(event.BlockSignatureEvent)
158                         if !ok {
159                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
160                                 continue
161                         }
162
163                         blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
164                         if err != nil {
165                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
166                                 return
167                         }
168
169                         blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
170                         if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
171                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
172                                 return
173                         }
174
175                 case <-m.quit:
176                         return
177                 }
178         }
179 }
180
181 func (m *Manager) removePeer(peerID string) {
182         m.peers.RemovePeer(peerID)
183 }
184
185 //Start consensus manager service.
186 func (m *Manager) Start() error {
187         go m.blockProposeMsgBroadcastLoop()
188         go m.blockSignatureMsgBroadcastLoop()
189         return nil
190 }
191
192 //Stop consensus manager service.
193 func (m *Manager) Stop() {
194         close(m.quit)
195 }