OSDN Git Service

feat: add cross-chain output (#56)
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         "github.com/vapor/event"
12         "github.com/vapor/p2p"
13         core "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         logModule             = "netsync"
20         maxTxChanSize         = 10000
21         maxFilterAddressSize  = 50
22         maxFilterAddressCount = 1000
23 )
24
25 var (
26         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
27 )
28
29 // Chain is the interface for Bytom core
30 type Chain interface {
31         BestBlockHeader() *types.BlockHeader
32         BestBlockHeight() uint64
33         GetBlockByHash(*bc.Hash) (*types.Block, error)
34         GetBlockByHeight(uint64) (*types.Block, error)
35         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
36         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
37         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
38         InMainChain(bc.Hash) bool
39         ProcessBlock(*types.Block) (bool, error)
40         ValidateTx(*types.Tx) (bool, error)
41 }
42
43 type Switch interface {
44         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45         AddBannedPeer(string) error
46         StopPeerGracefully(string)
47         NodeInfo() *p2p.NodeInfo
48         Start() (bool, error)
49         Stop() bool
50         IsListening() bool
51         DialPeerWithAddress(addr *p2p.NetAddress) error
52         Peers() *p2p.PeerSet
53 }
54
55 //SyncManager Sync Manager is responsible for the business layer information synchronization
56 type SyncManager struct {
57         sw           Switch
58         genesisHash  bc.Hash
59         chain        Chain
60         txPool       *core.TxPool
61         blockFetcher *blockFetcher
62         blockKeeper  *blockKeeper
63         peers        *peerSet
64
65         txSyncCh chan *txSyncMsg
66         quitSync chan struct{}
67         config   *cfg.Config
68
69         eventDispatcher *event.Dispatcher
70         minedBlockSub   *event.Subscription
71         txMsgSub        *event.Subscription
72 }
73
74 // CreateSyncManager create sync manager and set switch.
75 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
76         sw, err := p2p.NewSwitch(config)
77         if err != nil {
78                 return nil, err
79         }
80
81         return newSyncManager(config, sw, chain, txPool, dispatcher)
82 }
83
84 //NewSyncManager create a sync manager
85 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
86         genesisHeader, err := chain.GetHeaderByHeight(0)
87         if err != nil {
88                 return nil, err
89         }
90         peers := newPeerSet(sw)
91         manager := &SyncManager{
92                 sw:              sw,
93                 genesisHash:     genesisHeader.Hash(),
94                 txPool:          txPool,
95                 chain:           chain,
96                 blockFetcher:    newBlockFetcher(chain, peers),
97                 blockKeeper:     newBlockKeeper(chain, peers),
98                 peers:           peers,
99                 txSyncCh:        make(chan *txSyncMsg),
100                 quitSync:        make(chan struct{}),
101                 config:          config,
102                 eventDispatcher: dispatcher,
103         }
104
105         if !config.VaultMode {
106                 protocolReactor := NewProtocolReactor(manager, peers)
107                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
108         }
109         return manager, nil
110 }
111
112 //BestPeer return the highest p2p peerInfo
113 func (sm *SyncManager) BestPeer() *PeerInfo {
114         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
115         if bestPeer != nil {
116                 return bestPeer.getPeerInfo()
117         }
118         return nil
119 }
120
121 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
122         if sm.config.VaultMode {
123                 return errVaultModeDialPeer
124         }
125
126         return sm.sw.DialPeerWithAddress(addr)
127 }
128
129 func (sm *SyncManager) GetNetwork() string {
130         return sm.config.ChainID
131 }
132
133 //GetPeerInfos return peer info of all peers
134 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
135         return sm.peers.getPeerInfos()
136 }
137
138 //IsCaughtUp check wheather the peer finish the sync
139 func (sm *SyncManager) IsCaughtUp() bool {
140         peer := sm.peers.bestPeer(consensus.SFFullNode)
141         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
142 }
143
144 //StopPeer try to stop peer by given ID
145 func (sm *SyncManager) StopPeer(peerID string) error {
146         if peer := sm.peers.getPeer(peerID); peer == nil {
147                 return errors.New("peerId not exist")
148         }
149         sm.peers.removePeer(peerID)
150         return nil
151 }
152
153 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
154         block, err := msg.GetBlock()
155         if err != nil {
156                 return
157         }
158         sm.blockKeeper.processBlock(peer.ID(), block)
159 }
160
161 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
162         blocks, err := msg.GetBlocks()
163         if err != nil {
164                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
165                 return
166         }
167
168         sm.blockKeeper.processBlocks(peer.ID(), blocks)
169 }
170
171 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
172         peer.addFilterAddress(msg.Address)
173 }
174
175 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
176         peer.filterAdds.Clear()
177 }
178
179 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
180         peer.addFilterAddresses(msg.Addresses)
181 }
182
183 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
184         var block *types.Block
185         var err error
186         if msg.Height != 0 {
187                 block, err = sm.chain.GetBlockByHeight(msg.Height)
188         } else {
189                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
190         }
191         if err != nil {
192                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
193                 return
194         }
195
196         ok, err := peer.sendBlock(block)
197         if !ok {
198                 sm.peers.removePeer(peer.ID())
199         }
200         if err != nil {
201                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
202         }
203 }
204
205 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
206         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
207         if err != nil || len(blocks) == 0 {
208                 return
209         }
210
211         totalSize := 0
212         sendBlocks := []*types.Block{}
213         for _, block := range blocks {
214                 rawData, err := block.MarshalText()
215                 if err != nil {
216                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
217                         continue
218                 }
219
220                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
221                         break
222                 }
223                 totalSize += len(rawData)
224                 sendBlocks = append(sendBlocks, block)
225         }
226
227         ok, err := peer.sendBlocks(sendBlocks)
228         if !ok {
229                 sm.peers.removePeer(peer.ID())
230         }
231         if err != nil {
232                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
233         }
234 }
235
236 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
237         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
238         if err != nil || len(headers) == 0 {
239                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
240                 return
241         }
242
243         ok, err := peer.sendHeaders(headers)
244         if !ok {
245                 sm.peers.removePeer(peer.ID())
246         }
247         if err != nil {
248                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
249         }
250 }
251
252 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
253         var err error
254         var block *types.Block
255         if msg.Height != 0 {
256                 block, err = sm.chain.GetBlockByHeight(msg.Height)
257         } else {
258                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
259         }
260         if err != nil {
261                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
262                 return
263         }
264
265         blockHash := block.Hash()
266         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
267         if err != nil {
268                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
269                 return
270         }
271
272         ok, err := peer.sendMerkleBlock(block, txStatus)
273         if err != nil {
274                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
275                 return
276         }
277
278         if !ok {
279                 sm.peers.removePeer(peer.ID())
280         }
281 }
282
283 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
284         headers, err := msg.GetHeaders()
285         if err != nil {
286                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
287                 return
288         }
289
290         sm.blockKeeper.processHeaders(peer.ID(), headers)
291 }
292
293 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
294         block, err := msg.GetMineBlock()
295         if err != nil {
296                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
297                 return
298         }
299
300         hash := block.Hash()
301         peer.markBlock(&hash)
302         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
303         peer.setStatus(block.Height, &hash)
304 }
305
306 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
307         bestHeader := sm.chain.BestBlockHeader()
308         genesisBlock, err := sm.chain.GetBlockByHeight(0)
309         if err != nil {
310                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
311         }
312
313         genesisHash := genesisBlock.Hash()
314         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
315         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
316                 sm.peers.removePeer(peer.ID())
317         }
318 }
319
320 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
321         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
322                 peer.setStatus(msg.Height, msg.GetHash())
323                 return
324         }
325
326         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
327                 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
328                 return
329         }
330
331         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
332 }
333
334 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
335         tx, err := msg.GetTransaction()
336         if err != nil {
337                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
338                 return
339         }
340
341         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
342                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
343         }
344 }
345
346 func (sm *SyncManager) IsListening() bool {
347         if sm.config.VaultMode {
348                 return false
349         }
350         return sm.sw.IsListening()
351 }
352
353 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
354         return sm.sw.NodeInfo()
355 }
356
357 func (sm *SyncManager) PeerCount() int {
358         if sm.config.VaultMode {
359                 return 0
360         }
361         return len(sm.sw.Peers().List())
362 }
363
364 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
365         peer := sm.peers.getPeer(basePeer.ID())
366         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
367                 return
368         }
369
370         log.WithFields(log.Fields{
371                 "module":  logModule,
372                 "peer":    basePeer.Addr(),
373                 "type":    reflect.TypeOf(msg),
374                 "message": msg.String(),
375         }).Info("receive message from peer")
376
377         switch msg := msg.(type) {
378         case *GetBlockMessage:
379                 sm.handleGetBlockMsg(peer, msg)
380
381         case *BlockMessage:
382                 sm.handleBlockMsg(peer, msg)
383
384         case *StatusRequestMessage:
385                 sm.handleStatusRequestMsg(basePeer)
386
387         case *StatusResponseMessage:
388                 sm.handleStatusResponseMsg(basePeer, msg)
389
390         case *TransactionMessage:
391                 sm.handleTransactionMsg(peer, msg)
392
393         case *MineBlockMessage:
394                 sm.handleMineBlockMsg(peer, msg)
395
396         case *GetHeadersMessage:
397                 sm.handleGetHeadersMsg(peer, msg)
398
399         case *HeadersMessage:
400                 sm.handleHeadersMsg(peer, msg)
401
402         case *GetBlocksMessage:
403                 sm.handleGetBlocksMsg(peer, msg)
404
405         case *BlocksMessage:
406                 sm.handleBlocksMsg(peer, msg)
407
408         case *FilterLoadMessage:
409                 sm.handleFilterLoadMsg(peer, msg)
410
411         case *FilterAddMessage:
412                 sm.handleFilterAddMsg(peer, msg)
413
414         case *FilterClearMessage:
415                 sm.handleFilterClearMsg(peer)
416
417         case *GetMerkleBlockMessage:
418                 sm.handleGetMerkleBlockMsg(peer, msg)
419
420         default:
421                 log.WithFields(log.Fields{
422                         "module":       logModule,
423                         "peer":         basePeer.Addr(),
424                         "message_type": reflect.TypeOf(msg),
425                 }).Error("unhandled message type")
426         }
427 }
428
429 func (sm *SyncManager) Start() error {
430         var err error
431         if _, err = sm.sw.Start(); err != nil {
432                 log.Error("switch start err")
433                 return err
434         }
435
436         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
437         if err != nil {
438                 return err
439         }
440
441         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
442         if err != nil {
443                 return err
444         }
445
446         // broadcast transactions
447         go sm.txBroadcastLoop()
448         go sm.minedBroadcastLoop()
449         go sm.txSyncLoop()
450
451         return nil
452 }
453
454 //Stop stop sync manager
455 func (sm *SyncManager) Stop() {
456         close(sm.quitSync)
457         sm.minedBlockSub.Unsubscribe()
458         if !sm.config.VaultMode {
459                 sm.sw.Stop()
460         }
461 }
462
463 func (sm *SyncManager) minedBroadcastLoop() {
464         for {
465                 select {
466                 case obj, ok := <-sm.minedBlockSub.Chan():
467                         if !ok {
468                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
469                                 return
470                         }
471
472                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
473                         if !ok {
474                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
475                                 continue
476                         }
477
478                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
479                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
480                                 continue
481                         }
482
483                 case <-sm.quitSync:
484                         return
485                 }
486         }
487 }