OSDN Git Service

don't ban double sign
[bytom/vapor.git] / netsync / consensusmgr / handle.go
1 package consensusmgr
2
3 import (
4         "reflect"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/bytom/vapor/errors"
9         "github.com/bytom/vapor/event"
10         "github.com/bytom/vapor/netsync/peers"
11         "github.com/bytom/vapor/p2p"
12         "github.com/bytom/vapor/p2p/security"
13         "github.com/bytom/vapor/protocol"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16 )
17
18 // Switch is the interface for p2p switch.
19 type Switch interface {
20         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
21 }
22
23 // Chain is the interface for Bytom core.
24 type Chain interface {
25         BestBlockHeight() uint64
26         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
27         ProcessBlock(*types.Block) (bool, error)
28         ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
29 }
30
31 type Peers interface {
32         AddPeer(peer peers.BasePeer)
33         BroadcastMsg(bm peers.BroadcastMsg) error
34         GetPeer(id string) *peers.Peer
35         MarkBlock(peerID string, hash *bc.Hash)
36         MarkBlockSignature(peerID string, signature []byte)
37         ProcessIllegal(peerID string, level byte, reason string)
38         RemovePeer(peerID string)
39         SetStatus(peerID string, height uint64, hash *bc.Hash)
40 }
41
42 type blockMsg struct {
43         block  *types.Block
44         peerID string
45 }
46
47 // Manager is the consensus message network synchronization manager.
48 type Manager struct {
49         sw              Switch
50         chain           Chain
51         peers           Peers
52         blockFetcher    *blockFetcher
53         eventDispatcher *event.Dispatcher
54
55         quit chan struct{}
56 }
57
58 // NewManager create new manager.
59 func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
60         manager := &Manager{
61                 sw:              sw,
62                 chain:           chain,
63                 peers:           peers,
64                 blockFetcher:    newBlockFetcher(chain, peers),
65                 eventDispatcher: dispatcher,
66                 quit:            make(chan struct{}),
67         }
68         protocolReactor := NewConsensusReactor(manager)
69         manager.sw.AddReactor("CONSENSUS", protocolReactor)
70         return manager
71 }
72
73 func (m *Manager) addPeer(peer peers.BasePeer) {
74         m.peers.AddPeer(peer)
75 }
76
77 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
78         peer := m.peers.GetPeer(peerID)
79         if peer == nil {
80                 return
81         }
82
83         logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
84
85         switch msg := msg.(type) {
86         case *BlockProposeMsg:
87                 m.handleBlockProposeMsg(peerID, msg)
88
89         case *BlockSignatureMsg:
90                 m.handleBlockSignatureMsg(peerID, msg)
91
92         default:
93                 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
94         }
95 }
96
97 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
98         block, err := msg.GetProposeBlock()
99         if err != nil {
100                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
101                 return
102         }
103
104         hash := block.Hash()
105         m.peers.MarkBlock(peerID, &hash)
106         m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
107         m.peers.SetStatus(peerID, block.Height, &hash)
108 }
109
110 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
111         m.peers.MarkBlockSignature(peerID, msg.Signature)
112         blockHash := bc.NewHash(msg.BlockHash)
113         if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil {
114                 if errors.Root(err) != protocol.ErrDoubleSignBlock {
115                         m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
116                 }
117         }
118 }
119
120 func (m *Manager) blockProposeMsgBroadcastLoop() {
121         blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
122         if err != nil {
123                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
124                 return
125         }
126         defer blockProposeMsgSub.Unsubscribe()
127
128         for {
129                 select {
130                 case obj, ok := <-blockProposeMsgSub.Chan():
131                         if !ok {
132                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
133                                 return
134                         }
135
136                         ev, ok := obj.Data.(event.NewProposedBlockEvent)
137                         if !ok {
138                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
139                                 continue
140                         }
141                         proposeMsg, err := NewBlockProposeMsg(&ev.Block)
142                         if err != nil {
143                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
144                                 return
145                         }
146
147                         if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
148                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
149                                 continue
150                         }
151
152                 case <-m.quit:
153                         return
154                 }
155         }
156 }
157
158 func (m *Manager) blockSignatureMsgBroadcastLoop() {
159         blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
160         if err != nil {
161                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
162                 return
163         }
164         defer blockSignatureMsgSub.Unsubscribe()
165         for {
166                 select {
167                 case obj, ok := <-blockSignatureMsgSub.Chan():
168                         if !ok {
169                                 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
170                                 return
171                         }
172
173                         ev, ok := obj.Data.(event.BlockSignatureEvent)
174                         if !ok {
175                                 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
176                                 continue
177                         }
178
179                         blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel)
180                         if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
181                                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
182                                 continue
183                         }
184
185                 case <-m.quit:
186                         return
187                 }
188         }
189 }
190
191 func (m *Manager) removePeer(peerID string) {
192         m.peers.RemovePeer(peerID)
193 }
194
195 //Start consensus manager service.
196 func (m *Manager) Start() error {
197         go m.blockFetcher.blockProcessorLoop()
198         go m.blockProposeMsgBroadcastLoop()
199         go m.blockSignatureMsgBroadcastLoop()
200         return nil
201 }
202
203 //Stop consensus manager service.
204 func (m *Manager) Stop() {
205         close(m.quit)
206 }