OSDN Git Service

65fb9f29fa5b752467253c2205c418f20758f6bf
[bytom/vapor.git] / netsync / consensusmgr / handle.go
1 package consensusmgr
2
3 import (
4         "reflect"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/vapor/event"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/p2p"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 // Switch is the interface for p2p switch.
17 type Switch interface {
18         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
19 }
20
21 // Chain is the interface for Bytom core.
22 type Chain interface {
23         BestBlockHeight() uint64
24         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
25         ProcessBlock(*types.Block) (bool, error)
26         ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
27 }
28
29 type blockMsg struct {
30         block  *types.Block
31         peerID string
32 }
33
34 // Manager is the consensus message network synchronization manager.
35 type Manager struct {
36         sw              Switch
37         chain           Chain
38         peers           *peers.PeerSet
39         blockFetcher    *blockFetcher
40         eventDispatcher *event.Dispatcher
41
42         quit chan struct{}
43 }
44
45 // NewManager create new manager.
46 func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
47         manager := &Manager{
48                 sw:              sw,
49                 chain:           chain,
50                 peers:           peers,
51                 blockFetcher:    newBlockFetcher(chain, peers),
52                 eventDispatcher: dispatcher,
53                 quit:            make(chan struct{}),
54         }
55         protocolReactor := NewConsensusReactor(manager)
56         manager.sw.AddReactor("CONSENSUS", protocolReactor)
57         return manager
58 }
59
60 func (m *Manager) addPeer(peer peers.BasePeer) {
61         m.peers.AddPeer(peer)
62 }
63
64 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
65         peer := m.peers.GetPeer(peerID)
66         if peer == nil {
67                 return
68         }
69
70         logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
71
72         switch msg := msg.(type) {
73         case *BlockProposeMsg:
74                 m.handleBlockProposeMsg(peerID, msg)
75
76         case *BlockSignatureMsg:
77                 m.handleBlockSignatureMsg(peerID, msg)
78
79         default:
80                 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
81         }
82 }
83
84 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
85         block, err := msg.GetProposeBlock()
86         if err != nil {
87                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
88                 return
89         }
90
91         hash := block.Hash()
92         m.peers.MarkBlock(peerID, &hash)
93         m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
94         m.peers.SetStatus(peerID, block.Height, &hash)
95 }
96
97 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
98         m.peers.MarkBlockSignature(peerID, msg.Signature)
99         blockHash := bc.NewHash(msg.BlockHash)
100         if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil {
101                 m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
102                 return
103         }
104 }
105
106 func (m *Manager) blockProposeMsgBroadcastLoop() {
107         blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
108         if err != nil {
109                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
110                 return
111         }
112         defer blockProposeMsgSub.Unsubscribe()
113
114         for {
115                 select {
116                 case obj, ok := <-blockProposeMsgSub.Chan():
117                         if !ok {
118                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
119                                 return
120                         }
121
122                         ev, ok := obj.Data.(event.NewProposedBlockEvent)
123                         if !ok {
124                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
125                                 continue
126                         }
127                         proposeMsg, err := NewBlockProposeMsg(&ev.Block)
128                         if err != nil {
129                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
130                                 return
131                         }
132
133                         if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
134                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
135                                 continue
136                         }
137
138                 case <-m.quit:
139                         return
140                 }
141         }
142 }
143
144 func (m *Manager) blockSignatureMsgBroadcastLoop() {
145         blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
146         if err != nil {
147                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
148                 return
149         }
150         defer blockSignatureMsgSub.Unsubscribe()
151         for {
152                 select {
153                 case obj, ok := <-blockSignatureMsgSub.Chan():
154                         if !ok {
155                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
156                                 return
157                         }
158
159                         ev, ok := obj.Data.(event.BlockSignatureEvent)
160                         if !ok {
161                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
162                                 continue
163                         }
164
165                         blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel)
166                         if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
167                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
168                                 continue
169                         }
170
171                 case <-m.quit:
172                         return
173                 }
174         }
175 }
176
177 func (m *Manager) removePeer(peerID string) {
178         m.peers.RemovePeer(peerID)
179 }
180
181 //Start consensus manager service.
182 func (m *Manager) Start() error {
183         go m.blockProposeMsgBroadcastLoop()
184         go m.blockSignatureMsgBroadcastLoop()
185         return nil
186 }
187
188 //Stop consensus manager service.
189 func (m *Manager) Stop() {
190         close(m.quit)
191 }