OSDN Git Service

Hulk did something
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/tendermint/go-crypto"
10         cfg "github.com/vapor/config"
11         "github.com/vapor/consensus"
12         "github.com/vapor/event"
13         "github.com/vapor/p2p"
14         core "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17 )
18
19 const (
20         logModule             = "netsync"
21         maxTxChanSize         = 10000
22         maxFilterAddressSize  = 50
23         maxFilterAddressCount = 1000
24 )
25
26 var (
27         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
28 )
29
30 // Chain is the interface for Bytom core
31 type Chain interface {
32         BestBlockHeader() *types.BlockHeader
33         BestBlockHeight() uint64
34         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
35         GetBlockByHash(*bc.Hash) (*types.Block, error)
36         GetBlockByHeight(uint64) (*types.Block, error)
37         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
38         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
39         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
40         InMainChain(bc.Hash) bool
41         ProcessBlock(*types.Block) (bool, error)
42         ValidateTx(*types.Tx) (bool, error)
43 }
44
45 type Switch interface {
46         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
47         AddBannedPeer(string) error
48         StopPeerGracefully(string)
49         NodeInfo() *p2p.NodeInfo
50         Start() (bool, error)
51         Stop() bool
52         IsListening() bool
53         DialPeerWithAddress(addr *p2p.NetAddress) error
54         Peers() *p2p.PeerSet
55 }
56
57 //SyncManager Sync Manager is responsible for the business layer information synchronization
58 type SyncManager struct {
59         sw           Switch
60         genesisHash  bc.Hash
61         chain        Chain
62         txPool       *core.TxPool
63         blockFetcher *blockFetcher
64         blockKeeper  *blockKeeper
65         peers        *peerSet
66
67         txSyncCh chan *txSyncMsg
68         quitSync chan struct{}
69         config   *cfg.Config
70
71         eventDispatcher *event.Dispatcher
72         minedBlockSub   *event.Subscription
73         txMsgSub        *event.Subscription
74 }
75
76 // CreateSyncManager create sync manager and set switch.
77 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
78         sw, err := p2p.NewSwitch(config)
79         if err != nil {
80                 return nil, err
81         }
82
83         return newSyncManager(config, sw, chain, txPool, dispatcher)
84 }
85
86 //NewSyncManager create a sync manager
87 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
88         genesisHeader, err := chain.GetHeaderByHeight(0)
89         if err != nil {
90                 return nil, err
91         }
92         peers := newPeerSet(sw)
93         manager := &SyncManager{
94                 sw:              sw,
95                 genesisHash:     genesisHeader.Hash(),
96                 txPool:          txPool,
97                 chain:           chain,
98                 blockFetcher:    newBlockFetcher(chain, peers),
99                 blockKeeper:     newBlockKeeper(chain, peers),
100                 peers:           peers,
101                 txSyncCh:        make(chan *txSyncMsg),
102                 quitSync:        make(chan struct{}),
103                 config:          config,
104                 eventDispatcher: dispatcher,
105         }
106
107         if !config.VaultMode {
108                 protocolReactor := NewProtocolReactor(manager, peers)
109                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
110         }
111         return manager, nil
112 }
113
114 //BestPeer return the highest p2p peerInfo
115 func (sm *SyncManager) BestPeer() *PeerInfo {
116         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
117         if bestPeer != nil {
118                 return bestPeer.getPeerInfo()
119         }
120         return nil
121 }
122
123 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
124         if sm.config.VaultMode {
125                 return errVaultModeDialPeer
126         }
127
128         return sm.sw.DialPeerWithAddress(addr)
129 }
130
131 func (sm *SyncManager) GetNetwork() string {
132         return sm.config.ChainID
133 }
134
135 //GetPeerInfos return peer info of all peers
136 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
137         return sm.peers.getPeerInfos()
138 }
139
140 //IsCaughtUp check wheather the peer finish the sync
141 func (sm *SyncManager) IsCaughtUp() bool {
142         peer := sm.peers.bestPeer(consensus.SFFullNode)
143         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
144 }
145
146 //StopPeer try to stop peer by given ID
147 func (sm *SyncManager) StopPeer(peerID string) error {
148         if peer := sm.peers.getPeer(peerID); peer == nil {
149                 return errors.New("peerId not exist")
150         }
151         sm.peers.removePeer(peerID)
152         return nil
153 }
154
155 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
156         block, err := msg.GetBlock()
157         if err != nil {
158                 return
159         }
160         sm.blockKeeper.processBlock(peer.ID(), block)
161 }
162
163 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
164         blocks, err := msg.GetBlocks()
165         if err != nil {
166                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
167                 return
168         }
169
170         sm.blockKeeper.processBlocks(peer.ID(), blocks)
171 }
172
173 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
174         peer.addFilterAddress(msg.Address)
175 }
176
177 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
178         peer.filterAdds.Clear()
179 }
180
181 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
182         peer.addFilterAddresses(msg.Addresses)
183 }
184
185 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
186         var block *types.Block
187         var err error
188         if msg.Height != 0 {
189                 block, err = sm.chain.GetBlockByHeight(msg.Height)
190         } else {
191                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
192         }
193         if err != nil {
194                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
195                 return
196         }
197
198         ok, err := peer.sendBlock(block)
199         if !ok {
200                 sm.peers.removePeer(peer.ID())
201         }
202         if err != nil {
203                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
204         }
205 }
206
207 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
208         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
209         if err != nil || len(blocks) == 0 {
210                 return
211         }
212
213         totalSize := 0
214         sendBlocks := []*types.Block{}
215         for _, block := range blocks {
216                 rawData, err := block.MarshalText()
217                 if err != nil {
218                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
219                         continue
220                 }
221
222                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
223                         break
224                 }
225                 totalSize += len(rawData)
226                 sendBlocks = append(sendBlocks, block)
227         }
228
229         ok, err := peer.sendBlocks(sendBlocks)
230         if !ok {
231                 sm.peers.removePeer(peer.ID())
232         }
233         if err != nil {
234                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
235         }
236 }
237
238 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
239         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
240         if err != nil || len(headers) == 0 {
241                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
242                 return
243         }
244
245         ok, err := peer.sendHeaders(headers)
246         if !ok {
247                 sm.peers.removePeer(peer.ID())
248         }
249         if err != nil {
250                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
251         }
252 }
253
254 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
255         var err error
256         var block *types.Block
257         if msg.Height != 0 {
258                 block, err = sm.chain.GetBlockByHeight(msg.Height)
259         } else {
260                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
261         }
262         if err != nil {
263                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
264                 return
265         }
266
267         blockHash := block.Hash()
268         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
269         if err != nil {
270                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
271                 return
272         }
273
274         ok, err := peer.sendMerkleBlock(block, txStatus)
275         if err != nil {
276                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
277                 return
278         }
279
280         if !ok {
281                 sm.peers.removePeer(peer.ID())
282         }
283 }
284
285 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
286         headers, err := msg.GetHeaders()
287         if err != nil {
288                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
289                 return
290         }
291
292         sm.blockKeeper.processHeaders(peer.ID(), headers)
293 }
294
295 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
296         block, err := msg.GetMineBlock()
297         if err != nil {
298                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
299                 return
300         }
301
302         hash := block.Hash()
303         peer.markBlock(&hash)
304         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
305         peer.setStatus(block.Height, &hash)
306 }
307
308 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
309         bestHeader := sm.chain.BestBlockHeader()
310         genesisBlock, err := sm.chain.GetBlockByHeight(0)
311         if err != nil {
312                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
313         }
314
315         genesisHash := genesisBlock.Hash()
316         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
317         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
318                 sm.peers.removePeer(peer.ID())
319         }
320 }
321
322 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
323         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
324                 peer.setStatus(msg.Height, msg.GetHash())
325                 return
326         }
327
328         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
329                 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
330                 return
331         }
332
333         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
334 }
335
336 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
337         tx, err := msg.GetTransaction()
338         if err != nil {
339                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
340                 return
341         }
342
343         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
344                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
345         }
346 }
347
348 func (sm *SyncManager) IsListening() bool {
349         if sm.config.VaultMode {
350                 return false
351         }
352         return sm.sw.IsListening()
353 }
354
355 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
356         if sm.config.VaultMode {
357                 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
358         }
359         return sm.sw.NodeInfo()
360 }
361
362 func (sm *SyncManager) PeerCount() int {
363         if sm.config.VaultMode {
364                 return 0
365         }
366         return len(sm.sw.Peers().List())
367 }
368
369 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
370         peer := sm.peers.getPeer(basePeer.ID())
371         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
372                 return
373         }
374
375         log.WithFields(log.Fields{
376                 "module":  logModule,
377                 "peer":    basePeer.Addr(),
378                 "type":    reflect.TypeOf(msg),
379                 "message": msg.String(),
380         }).Info("receive message from peer")
381
382         switch msg := msg.(type) {
383         case *GetBlockMessage:
384                 sm.handleGetBlockMsg(peer, msg)
385
386         case *BlockMessage:
387                 sm.handleBlockMsg(peer, msg)
388
389         case *StatusRequestMessage:
390                 sm.handleStatusRequestMsg(basePeer)
391
392         case *StatusResponseMessage:
393                 sm.handleStatusResponseMsg(basePeer, msg)
394
395         case *TransactionMessage:
396                 sm.handleTransactionMsg(peer, msg)
397
398         case *MineBlockMessage:
399                 sm.handleMineBlockMsg(peer, msg)
400
401         case *GetHeadersMessage:
402                 sm.handleGetHeadersMsg(peer, msg)
403
404         case *HeadersMessage:
405                 sm.handleHeadersMsg(peer, msg)
406
407         case *GetBlocksMessage:
408                 sm.handleGetBlocksMsg(peer, msg)
409
410         case *BlocksMessage:
411                 sm.handleBlocksMsg(peer, msg)
412
413         case *FilterLoadMessage:
414                 sm.handleFilterLoadMsg(peer, msg)
415
416         case *FilterAddMessage:
417                 sm.handleFilterAddMsg(peer, msg)
418
419         case *FilterClearMessage:
420                 sm.handleFilterClearMsg(peer)
421
422         case *GetMerkleBlockMessage:
423                 sm.handleGetMerkleBlockMsg(peer, msg)
424
425         default:
426                 log.WithFields(log.Fields{
427                         "module":       logModule,
428                         "peer":         basePeer.Addr(),
429                         "message_type": reflect.TypeOf(msg),
430                 }).Error("unhandled message type")
431         }
432 }
433
434 func (sm *SyncManager) Start() error {
435         var err error
436         if _, err = sm.sw.Start(); err != nil {
437                 log.Error("switch start err")
438                 return err
439         }
440
441         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
442         if err != nil {
443                 return err
444         }
445
446         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
447         if err != nil {
448                 return err
449         }
450
451         // broadcast transactions
452         go sm.txBroadcastLoop()
453         go sm.minedBroadcastLoop()
454         go sm.txSyncLoop()
455
456         return nil
457 }
458
459 //Stop stop sync manager
460 func (sm *SyncManager) Stop() {
461         close(sm.quitSync)
462         sm.minedBlockSub.Unsubscribe()
463         if !sm.config.VaultMode {
464                 sm.sw.Stop()
465         }
466 }
467
468 func (sm *SyncManager) minedBroadcastLoop() {
469         for {
470                 select {
471                 case obj, ok := <-sm.minedBlockSub.Chan():
472                         if !ok {
473                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
474                                 return
475                         }
476
477                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
478                         if !ok {
479                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
480                                 continue
481                         }
482
483                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
484                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
485                                 continue
486                         }
487
488                 case <-sm.quitSync:
489                         return
490                 }
491         }
492 }