OSDN Git Service

fix next leader time (#98)
[bytom/vapor.git] / netsync / consensusmgr / handle.go
1 package consensusmgr
2
3 import (
4         "reflect"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/vapor/event"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/p2p"
11         "github.com/vapor/protocol/bc"
12         "github.com/vapor/protocol/bc/types"
13 )
14
15 // Switch is the interface for p2p switch.
16 type Switch interface {
17         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
18         AddBannedPeer(string) error
19         ID() [32]byte
20 }
21
22 // Chain is the interface for Bytom core.
23 type Chain interface {
24         BestBlockHeight() uint64
25         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
26         ProcessBlock(*types.Block) (bool, error)
27         ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
28 }
29
30 type blockMsg struct {
31         block  *types.Block
32         peerID string
33 }
34
35 // Manager is the consensus message network synchronization manager.
36 type Manager struct {
37         sw              Switch
38         chain           Chain
39         peers           *peers.PeerSet
40         blockFetcher    *blockFetcher
41         eventDispatcher *event.Dispatcher
42
43         quit chan struct{}
44 }
45
46 // NewManager create new manager.
47 func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
48         manager := &Manager{
49                 sw:              sw,
50                 chain:           chain,
51                 peers:           peers,
52                 blockFetcher:    newBlockFetcher(chain, peers),
53                 eventDispatcher: dispatcher,
54                 quit:            make(chan struct{}),
55         }
56         protocolReactor := NewConsensusReactor(manager)
57         manager.sw.AddReactor("CONSENSUS", protocolReactor)
58         return manager
59 }
60
61 func (m *Manager) addPeer(peer peers.BasePeer) {
62         m.peers.AddPeer(peer)
63 }
64
65 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
66         peer := m.peers.GetPeer(peerID)
67         if peer == nil {
68                 return
69         }
70
71         logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
72
73         switch msg := msg.(type) {
74         case *BlockProposeMsg:
75                 m.handleBlockProposeMsg(peerID, msg)
76
77         case *BlockSignatureMsg:
78                 m.handleBlockSignatureMsg(peerID, msg)
79
80         default:
81                 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
82         }
83 }
84
85 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
86         block, err := msg.GetProposeBlock()
87         if err != nil {
88                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
89                 return
90         }
91
92         hash := block.Hash()
93         m.peers.MarkBlock(peerID, &hash)
94         m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
95 }
96
97 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
98         blockHash := bc.NewHash(msg.BlockHash)
99         if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
100                 m.peers.AddBanScore(peerID, 20, 0, err.Error())
101                 return
102         }
103 }
104
105 func (m *Manager) blockProposeMsgBroadcastLoop() {
106         blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
107         if err != nil {
108                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
109                 return
110         }
111         defer blockProposeMsgSub.Unsubscribe()
112
113         for {
114                 select {
115                 case obj, ok := <-blockProposeMsgSub.Chan():
116                         if !ok {
117                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
118                                 return
119                         }
120
121                         ev, ok := obj.Data.(event.NewBlockProposeEvent)
122                         if !ok {
123                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
124                                 continue
125                         }
126                         proposeMsg, err := NewBlockProposeMsg(&ev.Block)
127                         if err != nil {
128                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
129                                 return
130                         }
131
132                         if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
133                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
134                                 continue
135                         }
136
137                 case <-m.quit:
138                         return
139                 }
140         }
141 }
142
143 func (m *Manager) blockSignatureMsgBroadcastLoop() {
144         blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
145         if err != nil {
146                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
147                 return
148         }
149         defer blockSignatureMsgSub.Unsubscribe()
150         for {
151                 select {
152                 case obj, ok := <-blockSignatureMsgSub.Chan():
153                         if !ok {
154                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
155                                 return
156                         }
157
158                         ev, ok := obj.Data.(event.BlockSignatureEvent)
159                         if !ok {
160                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
161                                 continue
162                         }
163
164                         blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
165                         if err != nil {
166                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
167                                 return
168                         }
169
170                         blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
171                         if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
172                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
173                                 return
174                         }
175
176                 case <-m.quit:
177                         return
178                 }
179         }
180 }
181
182 func (m *Manager) removePeer(peerID string) {
183         m.peers.RemovePeer(peerID)
184 }
185
186 //Start consensus manager service.
187 func (m *Manager) Start() error {
188         go m.blockProposeMsgBroadcastLoop()
189         go m.blockSignatureMsgBroadcastLoop()
190         return nil
191 }
192
193 //Stop consensus manager service.
194 func (m *Manager) Stop() {
195         close(m.quit)
196 }