OSDN Git Service

modify import path (#1805)
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/bytom/bytom/config"
10         "github.com/bytom/bytom/consensus"
11         "github.com/bytom/bytom/event"
12         "github.com/bytom/bytom/p2p"
13         "github.com/bytom/bytom/p2p/security"
14         core "github.com/bytom/bytom/protocol"
15         "github.com/bytom/bytom/protocol/bc"
16         "github.com/bytom/bytom/protocol/bc/types"
17         "github.com/tendermint/go-crypto"
18 )
19
20 const (
21         logModule             = "netsync"
22         maxTxChanSize         = 10000
23         maxFilterAddressSize  = 50
24         maxFilterAddressCount = 1000
25 )
26
27 var (
28         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
29 )
30
31 // Chain is the interface for Bytom core
32 type Chain interface {
33         BestBlockHeader() *types.BlockHeader
34         BestBlockHeight() uint64
35         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
36         GetBlockByHash(*bc.Hash) (*types.Block, error)
37         GetBlockByHeight(uint64) (*types.Block, error)
38         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
39         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
40         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
41         InMainChain(bc.Hash) bool
42         ProcessBlock(*types.Block) (bool, error)
43         ValidateTx(*types.Tx) (bool, error)
44 }
45
46 type Switch interface {
47         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
48         StopPeerGracefully(string)
49         NodeInfo() *p2p.NodeInfo
50         Start() (bool, error)
51         Stop() bool
52         IsListening() bool
53         DialPeerWithAddress(addr *p2p.NetAddress) error
54         Peers() *p2p.PeerSet
55         IsBanned(peerID string, level byte, reason string) bool
56 }
57
58 //SyncManager Sync Manager is responsible for the business layer information synchronization
59 type SyncManager struct {
60         sw           Switch
61         genesisHash  bc.Hash
62         chain        Chain
63         txPool       *core.TxPool
64         blockFetcher *blockFetcher
65         blockKeeper  *blockKeeper
66         peers        *peerSet
67
68         txSyncCh chan *txSyncMsg
69         quitSync chan struct{}
70         config   *cfg.Config
71
72         eventDispatcher *event.Dispatcher
73         minedBlockSub   *event.Subscription
74         txMsgSub        *event.Subscription
75 }
76
77 // CreateSyncManager create sync manager and set switch.
78 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
79         sw, err := p2p.NewSwitch(config)
80         if err != nil {
81                 return nil, err
82         }
83
84         return newSyncManager(config, sw, chain, txPool, dispatcher)
85 }
86
87 //NewSyncManager create a sync manager
88 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
89         genesisHeader, err := chain.GetHeaderByHeight(0)
90         if err != nil {
91                 return nil, err
92         }
93         peers := newPeerSet(sw)
94         manager := &SyncManager{
95                 sw:              sw,
96                 genesisHash:     genesisHeader.Hash(),
97                 txPool:          txPool,
98                 chain:           chain,
99                 blockFetcher:    newBlockFetcher(chain, peers),
100                 blockKeeper:     newBlockKeeper(chain, peers),
101                 peers:           peers,
102                 txSyncCh:        make(chan *txSyncMsg),
103                 quitSync:        make(chan struct{}),
104                 config:          config,
105                 eventDispatcher: dispatcher,
106         }
107
108         if !config.VaultMode {
109                 protocolReactor := NewProtocolReactor(manager, peers)
110                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
111         }
112         return manager, nil
113 }
114
115 //BestPeer return the highest p2p peerInfo
116 func (sm *SyncManager) BestPeer() *PeerInfo {
117         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
118         if bestPeer != nil {
119                 return bestPeer.getPeerInfo()
120         }
121         return nil
122 }
123
124 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
125         if sm.config.VaultMode {
126                 return errVaultModeDialPeer
127         }
128
129         return sm.sw.DialPeerWithAddress(addr)
130 }
131
132 func (sm *SyncManager) GetNetwork() string {
133         return sm.config.ChainID
134 }
135
136 //GetPeerInfos return peer info of all peers
137 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
138         return sm.peers.getPeerInfos()
139 }
140
141 //IsCaughtUp check wheather the peer finish the sync
142 func (sm *SyncManager) IsCaughtUp() bool {
143         peer := sm.peers.bestPeer(consensus.SFFullNode)
144         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
145 }
146
147 //StopPeer try to stop peer by given ID
148 func (sm *SyncManager) StopPeer(peerID string) error {
149         if peer := sm.peers.getPeer(peerID); peer == nil {
150                 return errors.New("peerId not exist")
151         }
152         sm.peers.removePeer(peerID)
153         return nil
154 }
155
156 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
157         block, err := msg.GetBlock()
158         if err != nil {
159                 return
160         }
161         sm.blockKeeper.processBlock(peer.ID(), block)
162 }
163
164 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
165         blocks, err := msg.GetBlocks()
166         if err != nil {
167                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
168                 return
169         }
170
171         sm.blockKeeper.processBlocks(peer.ID(), blocks)
172 }
173
174 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
175         peer.addFilterAddress(msg.Address)
176 }
177
178 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
179         peer.filterAdds.Clear()
180 }
181
182 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
183         peer.addFilterAddresses(msg.Addresses)
184 }
185
186 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
187         var block *types.Block
188         var err error
189         if msg.Height != 0 {
190                 block, err = sm.chain.GetBlockByHeight(msg.Height)
191         } else {
192                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
193         }
194         if err != nil {
195                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
196                 return
197         }
198
199         ok, err := peer.sendBlock(block)
200         if !ok {
201                 sm.peers.removePeer(peer.ID())
202         }
203         if err != nil {
204                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
205         }
206 }
207
208 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
209         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
210         if err != nil || len(blocks) == 0 {
211                 return
212         }
213
214         totalSize := 0
215         sendBlocks := []*types.Block{}
216         for _, block := range blocks {
217                 rawData, err := block.MarshalText()
218                 if err != nil {
219                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
220                         continue
221                 }
222
223                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
224                         break
225                 }
226                 totalSize += len(rawData)
227                 sendBlocks = append(sendBlocks, block)
228         }
229
230         ok, err := peer.sendBlocks(sendBlocks)
231         if !ok {
232                 sm.peers.removePeer(peer.ID())
233         }
234         if err != nil {
235                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
236         }
237 }
238
239 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
240         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
241         if err != nil || len(headers) == 0 {
242                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
243                 return
244         }
245
246         ok, err := peer.sendHeaders(headers)
247         if !ok {
248                 sm.peers.removePeer(peer.ID())
249         }
250         if err != nil {
251                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
252         }
253 }
254
255 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
256         var err error
257         var block *types.Block
258         if msg.Height != 0 {
259                 block, err = sm.chain.GetBlockByHeight(msg.Height)
260         } else {
261                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
262         }
263         if err != nil {
264                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
265                 return
266         }
267
268         blockHash := block.Hash()
269         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
270         if err != nil {
271                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
272                 return
273         }
274
275         ok, err := peer.sendMerkleBlock(block, txStatus)
276         if err != nil {
277                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
278                 return
279         }
280
281         if !ok {
282                 sm.peers.removePeer(peer.ID())
283         }
284 }
285
286 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
287         headers, err := msg.GetHeaders()
288         if err != nil {
289                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
290                 return
291         }
292
293         sm.blockKeeper.processHeaders(peer.ID(), headers)
294 }
295
296 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
297         block, err := msg.GetMineBlock()
298         if err != nil {
299                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
300                 return
301         }
302
303         hash := block.Hash()
304         peer.markBlock(&hash)
305         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
306         peer.setStatus(block.Height, &hash)
307 }
308
309 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
310         bestHeader := sm.chain.BestBlockHeader()
311         genesisBlock, err := sm.chain.GetBlockByHeight(0)
312         if err != nil {
313                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
314         }
315
316         genesisHash := genesisBlock.Hash()
317         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
318         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
319                 sm.peers.removePeer(peer.ID())
320         }
321 }
322
323 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
324         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
325                 peer.setStatus(msg.Height, msg.GetHash())
326                 return
327         }
328
329         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
330                 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
331                 return
332         }
333
334         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
335 }
336
337 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
338         tx, err := msg.GetTransaction()
339         if err != nil {
340                 sm.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
341                 return
342         }
343
344         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
345                 sm.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
346         }
347 }
348
349 func (sm *SyncManager) IsListening() bool {
350         if sm.config.VaultMode {
351                 return false
352         }
353         return sm.sw.IsListening()
354 }
355
356 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
357         if sm.config.VaultMode {
358                 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
359         }
360         return sm.sw.NodeInfo()
361 }
362
363 func (sm *SyncManager) PeerCount() int {
364         if sm.config.VaultMode {
365                 return 0
366         }
367         return len(sm.sw.Peers().List())
368 }
369
370 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
371         peer := sm.peers.getPeer(basePeer.ID())
372         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
373                 return
374         }
375
376         log.WithFields(log.Fields{
377                 "module":  logModule,
378                 "peer":    basePeer.Addr(),
379                 "type":    reflect.TypeOf(msg),
380                 "message": msg.String(),
381         }).Info("receive message from peer")
382
383         switch msg := msg.(type) {
384         case *GetBlockMessage:
385                 sm.handleGetBlockMsg(peer, msg)
386
387         case *BlockMessage:
388                 sm.handleBlockMsg(peer, msg)
389
390         case *StatusRequestMessage:
391                 sm.handleStatusRequestMsg(basePeer)
392
393         case *StatusResponseMessage:
394                 sm.handleStatusResponseMsg(basePeer, msg)
395
396         case *TransactionMessage:
397                 sm.handleTransactionMsg(peer, msg)
398
399         case *MineBlockMessage:
400                 sm.handleMineBlockMsg(peer, msg)
401
402         case *GetHeadersMessage:
403                 sm.handleGetHeadersMsg(peer, msg)
404
405         case *HeadersMessage:
406                 sm.handleHeadersMsg(peer, msg)
407
408         case *GetBlocksMessage:
409                 sm.handleGetBlocksMsg(peer, msg)
410
411         case *BlocksMessage:
412                 sm.handleBlocksMsg(peer, msg)
413
414         case *FilterLoadMessage:
415                 sm.handleFilterLoadMsg(peer, msg)
416
417         case *FilterAddMessage:
418                 sm.handleFilterAddMsg(peer, msg)
419
420         case *FilterClearMessage:
421                 sm.handleFilterClearMsg(peer)
422
423         case *GetMerkleBlockMessage:
424                 sm.handleGetMerkleBlockMsg(peer, msg)
425
426         default:
427                 log.WithFields(log.Fields{
428                         "module":       logModule,
429                         "peer":         basePeer.Addr(),
430                         "message_type": reflect.TypeOf(msg),
431                 }).Error("unhandled message type")
432         }
433 }
434
435 func (sm *SyncManager) Start() error {
436         var err error
437         if _, err = sm.sw.Start(); err != nil {
438                 log.Error("switch start err")
439                 return err
440         }
441
442         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
443         if err != nil {
444                 return err
445         }
446
447         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
448         if err != nil {
449                 return err
450         }
451
452         // broadcast transactions
453         go sm.txBroadcastLoop()
454         go sm.minedBroadcastLoop()
455         go sm.txSyncLoop()
456
457         return nil
458 }
459
460 //Stop stop sync manager
461 func (sm *SyncManager) Stop() {
462         close(sm.quitSync)
463         sm.minedBlockSub.Unsubscribe()
464         if !sm.config.VaultMode {
465                 sm.sw.Stop()
466         }
467 }
468
469 func (sm *SyncManager) minedBroadcastLoop() {
470         for {
471                 select {
472                 case obj, ok := <-sm.minedBlockSub.Chan():
473                         if !ok {
474                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
475                                 return
476                         }
477
478                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
479                         if !ok {
480                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
481                                 continue
482                         }
483
484                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
485                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
486                                 continue
487                         }
488
489                 case <-sm.quitSync:
490                         return
491                 }
492         }
493 }