OSDN Git Service

Optimize status message process (#66)
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         "github.com/vapor/event"
12         "github.com/vapor/p2p"
13         core "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         logModule             = "netsync"
20         maxTxChanSize         = 10000
21         maxFilterAddressSize  = 50
22         maxFilterAddressCount = 1000
23 )
24
25 var (
26         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
27 )
28
29 // Chain is the interface for Bytom core
30 type Chain interface {
31         BestBlockHeader() *types.BlockHeader
32         BestBlockHeight() uint64
33         GetBlockByHash(*bc.Hash) (*types.Block, error)
34         GetBlockByHeight(uint64) (*types.Block, error)
35         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
36         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
37         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
38         InMainChain(bc.Hash) bool
39         ProcessBlock(*types.Block) (bool, error)
40         ValidateTx(*types.Tx) (bool, error)
41 }
42
43 type Switch interface {
44         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45         AddBannedPeer(string) error
46         StopPeerGracefully(string)
47         NodeInfo() *p2p.NodeInfo
48         Start() (bool, error)
49         Stop() bool
50         IsListening() bool
51         DialPeerWithAddress(addr *p2p.NetAddress) error
52         Peers() *p2p.PeerSet
53 }
54
55 //SyncManager Sync Manager is responsible for the business layer information synchronization
56 type SyncManager struct {
57         sw           Switch
58         genesisHash  bc.Hash
59         chain        Chain
60         txPool       *core.TxPool
61         blockFetcher *blockFetcher
62         blockKeeper  *blockKeeper
63         peers        *peerSet
64
65         txSyncCh chan *txSyncMsg
66         quitSync chan struct{}
67         config   *cfg.Config
68
69         eventDispatcher *event.Dispatcher
70         minedBlockSub   *event.Subscription
71         txMsgSub        *event.Subscription
72 }
73
74 // CreateSyncManager create sync manager and set switch.
75 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
76         sw, err := p2p.NewSwitch(config)
77         if err != nil {
78                 return nil, err
79         }
80
81         return newSyncManager(config, sw, chain, txPool, dispatcher)
82 }
83
84 //NewSyncManager create a sync manager
85 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
86         genesisHeader, err := chain.GetHeaderByHeight(0)
87         if err != nil {
88                 return nil, err
89         }
90         peers := newPeerSet(sw)
91         manager := &SyncManager{
92                 sw:              sw,
93                 genesisHash:     genesisHeader.Hash(),
94                 txPool:          txPool,
95                 chain:           chain,
96                 blockFetcher:    newBlockFetcher(chain, peers),
97                 blockKeeper:     newBlockKeeper(chain, peers),
98                 peers:           peers,
99                 txSyncCh:        make(chan *txSyncMsg),
100                 quitSync:        make(chan struct{}),
101                 config:          config,
102                 eventDispatcher: dispatcher,
103         }
104
105         if !config.VaultMode {
106                 protocolReactor := NewProtocolReactor(manager, peers)
107                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
108         }
109         return manager, nil
110 }
111
112 func (sm *SyncManager) AddPeer(peer BasePeer) {
113         sm.peers.addPeer(peer)
114 }
115
116 //BestPeer return the highest p2p peerInfo
117 func (sm *SyncManager) BestPeer() *PeerInfo {
118         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
119         if bestPeer != nil {
120                 return bestPeer.getPeerInfo()
121         }
122         return nil
123 }
124
125 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
126         if sm.config.VaultMode {
127                 return errVaultModeDialPeer
128         }
129
130         return sm.sw.DialPeerWithAddress(addr)
131 }
132
133 func (sm *SyncManager) GetNetwork() string {
134         return sm.config.ChainID
135 }
136
137 //GetPeerInfos return peer info of all peers
138 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
139         return sm.peers.getPeerInfos()
140 }
141
142 //IsCaughtUp check wheather the peer finish the sync
143 func (sm *SyncManager) IsCaughtUp() bool {
144         peer := sm.peers.bestPeer(consensus.SFFullNode)
145         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
146 }
147
148 //StopPeer try to stop peer by given ID
149 func (sm *SyncManager) StopPeer(peerID string) error {
150         if peer := sm.peers.getPeer(peerID); peer == nil {
151                 return errors.New("peerId not exist")
152         }
153         sm.peers.removePeer(peerID)
154         return nil
155 }
156
157 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
158         block, err := msg.GetBlock()
159         if err != nil {
160                 return
161         }
162         sm.blockKeeper.processBlock(peer.ID(), block)
163 }
164
165 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
166         blocks, err := msg.GetBlocks()
167         if err != nil {
168                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
169                 return
170         }
171
172         sm.blockKeeper.processBlocks(peer.ID(), blocks)
173 }
174
175 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
176         peer.addFilterAddress(msg.Address)
177 }
178
179 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
180         peer.filterAdds.Clear()
181 }
182
183 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
184         peer.addFilterAddresses(msg.Addresses)
185 }
186
187 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
188         var block *types.Block
189         var err error
190         if msg.Height != 0 {
191                 block, err = sm.chain.GetBlockByHeight(msg.Height)
192         } else {
193                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
194         }
195         if err != nil {
196                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
197                 return
198         }
199
200         ok, err := peer.sendBlock(block)
201         if !ok {
202                 sm.peers.removePeer(peer.ID())
203         }
204         if err != nil {
205                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
206         }
207 }
208
209 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
210         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
211         if err != nil || len(blocks) == 0 {
212                 return
213         }
214
215         totalSize := 0
216         sendBlocks := []*types.Block{}
217         for _, block := range blocks {
218                 rawData, err := block.MarshalText()
219                 if err != nil {
220                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
221                         continue
222                 }
223
224                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
225                         break
226                 }
227                 totalSize += len(rawData)
228                 sendBlocks = append(sendBlocks, block)
229         }
230
231         ok, err := peer.sendBlocks(sendBlocks)
232         if !ok {
233                 sm.peers.removePeer(peer.ID())
234         }
235         if err != nil {
236                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
237         }
238 }
239
240 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
241         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
242         if err != nil || len(headers) == 0 {
243                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
244                 return
245         }
246
247         ok, err := peer.sendHeaders(headers)
248         if !ok {
249                 sm.peers.removePeer(peer.ID())
250         }
251         if err != nil {
252                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
253         }
254 }
255
256 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
257         var err error
258         var block *types.Block
259         if msg.Height != 0 {
260                 block, err = sm.chain.GetBlockByHeight(msg.Height)
261         } else {
262                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
263         }
264         if err != nil {
265                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
266                 return
267         }
268
269         blockHash := block.Hash()
270         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
271         if err != nil {
272                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
273                 return
274         }
275
276         ok, err := peer.sendMerkleBlock(block, txStatus)
277         if err != nil {
278                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
279                 return
280         }
281
282         if !ok {
283                 sm.peers.removePeer(peer.ID())
284         }
285 }
286
287 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
288         headers, err := msg.GetHeaders()
289         if err != nil {
290                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
291                 return
292         }
293
294         sm.blockKeeper.processHeaders(peer.ID(), headers)
295 }
296
297 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
298         block, err := msg.GetMineBlock()
299         if err != nil {
300                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
301                 return
302         }
303
304         hash := block.Hash()
305         peer.markBlock(&hash)
306         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
307         peer.setStatus(block.Height, &hash)
308 }
309
310 func (sm *SyncManager) handleStatusMsg(basePeer BasePeer, msg *StatusMessage) {
311         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
312                 peer.setStatus(msg.Height, msg.GetHash())
313                 return
314         }
315 }
316
317 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
318         tx, err := msg.GetTransaction()
319         if err != nil {
320                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
321                 return
322         }
323
324         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
325                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
326         }
327 }
328
329 func (sm *SyncManager) IsListening() bool {
330         if sm.config.VaultMode {
331                 return false
332         }
333         return sm.sw.IsListening()
334 }
335
336 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
337         return sm.sw.NodeInfo()
338 }
339
340 func (sm *SyncManager) PeerCount() int {
341         if sm.config.VaultMode {
342                 return 0
343         }
344         return len(sm.sw.Peers().List())
345 }
346
347 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
348         peer := sm.peers.getPeer(basePeer.ID())
349         if peer == nil {
350                 return
351         }
352
353         log.WithFields(log.Fields{
354                 "module":  logModule,
355                 "peer":    basePeer.Addr(),
356                 "type":    reflect.TypeOf(msg),
357                 "message": msg.String(),
358         }).Info("receive message from peer")
359
360         switch msg := msg.(type) {
361         case *GetBlockMessage:
362                 sm.handleGetBlockMsg(peer, msg)
363
364         case *BlockMessage:
365                 sm.handleBlockMsg(peer, msg)
366
367         case *StatusMessage:
368                 sm.handleStatusMsg(basePeer, msg)
369
370         case *TransactionMessage:
371                 sm.handleTransactionMsg(peer, msg)
372
373         case *MineBlockMessage:
374                 sm.handleMineBlockMsg(peer, msg)
375
376         case *GetHeadersMessage:
377                 sm.handleGetHeadersMsg(peer, msg)
378
379         case *HeadersMessage:
380                 sm.handleHeadersMsg(peer, msg)
381
382         case *GetBlocksMessage:
383                 sm.handleGetBlocksMsg(peer, msg)
384
385         case *BlocksMessage:
386                 sm.handleBlocksMsg(peer, msg)
387
388         case *FilterLoadMessage:
389                 sm.handleFilterLoadMsg(peer, msg)
390
391         case *FilterAddMessage:
392                 sm.handleFilterAddMsg(peer, msg)
393
394         case *FilterClearMessage:
395                 sm.handleFilterClearMsg(peer)
396
397         case *GetMerkleBlockMessage:
398                 sm.handleGetMerkleBlockMsg(peer, msg)
399
400         default:
401                 log.WithFields(log.Fields{
402                         "module":       logModule,
403                         "peer":         basePeer.Addr(),
404                         "message_type": reflect.TypeOf(msg),
405                 }).Error("unhandled message type")
406         }
407 }
408
409 func (sm *SyncManager) SendStatus(peer BasePeer) error {
410         p := sm.peers.getPeer(peer.ID())
411         if p == nil {
412                 return errors.New("invalid peer")
413         }
414
415         if err := p.sendStatus(sm.chain.BestBlockHeader()); err != nil {
416                 sm.peers.removePeer(p.ID())
417                 return err
418         }
419         return nil
420 }
421
422 func (sm *SyncManager) Start() error {
423         var err error
424         if _, err = sm.sw.Start(); err != nil {
425                 log.Error("switch start err")
426                 return err
427         }
428
429         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
430         if err != nil {
431                 return err
432         }
433
434         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
435         if err != nil {
436                 return err
437         }
438
439         // broadcast transactions
440         go sm.txBroadcastLoop()
441         go sm.minedBroadcastLoop()
442         go sm.txSyncLoop()
443
444         return nil
445 }
446
447 //Stop stop sync manager
448 func (sm *SyncManager) Stop() {
449         close(sm.quitSync)
450         sm.minedBlockSub.Unsubscribe()
451         if !sm.config.VaultMode {
452                 sm.sw.Stop()
453         }
454 }
455
456 func (sm *SyncManager) minedBroadcastLoop() {
457         for {
458                 select {
459                 case obj, ok := <-sm.minedBlockSub.Chan():
460                         if !ok {
461                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
462                                 return
463                         }
464
465                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
466                         if !ok {
467                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
468                                 continue
469                         }
470
471                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
472                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
473                                 continue
474                         }
475
476                 case <-sm.quitSync:
477                         return
478                 }
479         }
480 }