OSDN Git Service

Nodeinfo handshake information modification (#68)
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         "github.com/vapor/event"
12         "github.com/vapor/p2p"
13         core "github.com/vapor/protocol"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16 )
17
18 const (
19         logModule             = "netsync"
20         maxTxChanSize         = 10000
21         maxFilterAddressSize  = 50
22         maxFilterAddressCount = 1000
23 )
24
25 var (
26         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
27 )
28
29 // Chain is the interface for Bytom core
30 type Chain interface {
31         BestBlockHeader() *types.BlockHeader
32         BestBlockHeight() uint64
33         GetBlockByHash(*bc.Hash) (*types.Block, error)
34         GetBlockByHeight(uint64) (*types.Block, error)
35         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
36         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
37         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
38         InMainChain(bc.Hash) bool
39         ProcessBlock(*types.Block) (bool, error)
40         ValidateTx(*types.Tx) (bool, error)
41 }
42
43 type Switch interface {
44         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45         AddBannedPeer(string) error
46         StopPeerGracefully(string)
47         Start() (bool, error)
48         Stop() bool
49         IsListening() bool
50         DialPeerWithAddress(addr *p2p.NetAddress) error
51         Peers() *p2p.PeerSet
52 }
53
54 //SyncManager Sync Manager is responsible for the business layer information synchronization
55 type SyncManager struct {
56         sw           Switch
57         genesisHash  bc.Hash
58         chain        Chain
59         txPool       *core.TxPool
60         blockFetcher *blockFetcher
61         blockKeeper  *blockKeeper
62         peers        *peerSet
63
64         txSyncCh chan *txSyncMsg
65         quitSync chan struct{}
66         config   *cfg.Config
67
68         eventDispatcher *event.Dispatcher
69         minedBlockSub   *event.Subscription
70         txMsgSub        *event.Subscription
71 }
72
73 // CreateSyncManager create sync manager and set switch.
74 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
75         sw, err := p2p.NewSwitch(config)
76         if err != nil {
77                 return nil, err
78         }
79
80         return newSyncManager(config, sw, chain, txPool, dispatcher)
81 }
82
83 //NewSyncManager create a sync manager
84 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
85         genesisHeader, err := chain.GetHeaderByHeight(0)
86         if err != nil {
87                 return nil, err
88         }
89         peers := newPeerSet(sw)
90         manager := &SyncManager{
91                 sw:              sw,
92                 genesisHash:     genesisHeader.Hash(),
93                 txPool:          txPool,
94                 chain:           chain,
95                 blockFetcher:    newBlockFetcher(chain, peers),
96                 blockKeeper:     newBlockKeeper(chain, peers),
97                 peers:           peers,
98                 txSyncCh:        make(chan *txSyncMsg),
99                 quitSync:        make(chan struct{}),
100                 config:          config,
101                 eventDispatcher: dispatcher,
102         }
103
104         if !config.VaultMode {
105                 protocolReactor := NewProtocolReactor(manager, peers)
106                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
107         }
108         return manager, nil
109 }
110
111 func (sm *SyncManager) AddPeer(peer BasePeer) {
112         sm.peers.addPeer(peer)
113 }
114
115 //BestPeer return the highest p2p peerInfo
116 func (sm *SyncManager) BestPeer() *PeerInfo {
117         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
118         if bestPeer != nil {
119                 return bestPeer.getPeerInfo()
120         }
121         return nil
122 }
123
124 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
125         if sm.config.VaultMode {
126                 return errVaultModeDialPeer
127         }
128
129         return sm.sw.DialPeerWithAddress(addr)
130 }
131
132 func (sm *SyncManager) GetNetwork() string {
133         return sm.config.ChainID
134 }
135
136 //GetPeerInfos return peer info of all peers
137 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
138         return sm.peers.getPeerInfos()
139 }
140
141 //IsCaughtUp check wheather the peer finish the sync
142 func (sm *SyncManager) IsCaughtUp() bool {
143         peer := sm.peers.bestPeer(consensus.SFFullNode)
144         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
145 }
146
147 //StopPeer try to stop peer by given ID
148 func (sm *SyncManager) StopPeer(peerID string) error {
149         if peer := sm.peers.getPeer(peerID); peer == nil {
150                 return errors.New("peerId not exist")
151         }
152         sm.peers.removePeer(peerID)
153         return nil
154 }
155
156 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
157         block, err := msg.GetBlock()
158         if err != nil {
159                 return
160         }
161         sm.blockKeeper.processBlock(peer.ID(), block)
162 }
163
164 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
165         blocks, err := msg.GetBlocks()
166         if err != nil {
167                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
168                 return
169         }
170
171         sm.blockKeeper.processBlocks(peer.ID(), blocks)
172 }
173
174 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
175         peer.addFilterAddress(msg.Address)
176 }
177
178 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
179         peer.filterAdds.Clear()
180 }
181
182 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
183         peer.addFilterAddresses(msg.Addresses)
184 }
185
186 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
187         var block *types.Block
188         var err error
189         if msg.Height != 0 {
190                 block, err = sm.chain.GetBlockByHeight(msg.Height)
191         } else {
192                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
193         }
194         if err != nil {
195                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
196                 return
197         }
198
199         ok, err := peer.sendBlock(block)
200         if !ok {
201                 sm.peers.removePeer(peer.ID())
202         }
203         if err != nil {
204                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
205         }
206 }
207
208 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
209         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
210         if err != nil || len(blocks) == 0 {
211                 return
212         }
213
214         totalSize := 0
215         sendBlocks := []*types.Block{}
216         for _, block := range blocks {
217                 rawData, err := block.MarshalText()
218                 if err != nil {
219                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
220                         continue
221                 }
222
223                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
224                         break
225                 }
226                 totalSize += len(rawData)
227                 sendBlocks = append(sendBlocks, block)
228         }
229
230         ok, err := peer.sendBlocks(sendBlocks)
231         if !ok {
232                 sm.peers.removePeer(peer.ID())
233         }
234         if err != nil {
235                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
236         }
237 }
238
239 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
240         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
241         if err != nil || len(headers) == 0 {
242                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
243                 return
244         }
245
246         ok, err := peer.sendHeaders(headers)
247         if !ok {
248                 sm.peers.removePeer(peer.ID())
249         }
250         if err != nil {
251                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
252         }
253 }
254
255 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
256         var err error
257         var block *types.Block
258         if msg.Height != 0 {
259                 block, err = sm.chain.GetBlockByHeight(msg.Height)
260         } else {
261                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
262         }
263         if err != nil {
264                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
265                 return
266         }
267
268         blockHash := block.Hash()
269         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
270         if err != nil {
271                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
272                 return
273         }
274
275         ok, err := peer.sendMerkleBlock(block, txStatus)
276         if err != nil {
277                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
278                 return
279         }
280
281         if !ok {
282                 sm.peers.removePeer(peer.ID())
283         }
284 }
285
286 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
287         headers, err := msg.GetHeaders()
288         if err != nil {
289                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
290                 return
291         }
292
293         sm.blockKeeper.processHeaders(peer.ID(), headers)
294 }
295
296 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
297         block, err := msg.GetMineBlock()
298         if err != nil {
299                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
300                 return
301         }
302
303         hash := block.Hash()
304         peer.markBlock(&hash)
305         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
306         peer.setStatus(block.Height, &hash)
307 }
308
309 func (sm *SyncManager) handleStatusMsg(basePeer BasePeer, msg *StatusMessage) {
310         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
311                 peer.setStatus(msg.Height, msg.GetHash())
312                 return
313         }
314 }
315
316 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
317         tx, err := msg.GetTransaction()
318         if err != nil {
319                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
320                 return
321         }
322
323         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
324                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
325         }
326 }
327
328 func (sm *SyncManager) IsListening() bool {
329         if sm.config.VaultMode {
330                 return false
331         }
332         return sm.sw.IsListening()
333 }
334
335 func (sm *SyncManager) PeerCount() int {
336         if sm.config.VaultMode {
337                 return 0
338         }
339         return len(sm.sw.Peers().List())
340 }
341
342 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
343         peer := sm.peers.getPeer(basePeer.ID())
344         if peer == nil {
345                 return
346         }
347
348         log.WithFields(log.Fields{
349                 "module":  logModule,
350                 "peer":    basePeer.Addr(),
351                 "type":    reflect.TypeOf(msg),
352                 "message": msg.String(),
353         }).Info("receive message from peer")
354
355         switch msg := msg.(type) {
356         case *GetBlockMessage:
357                 sm.handleGetBlockMsg(peer, msg)
358
359         case *BlockMessage:
360                 sm.handleBlockMsg(peer, msg)
361
362         case *StatusMessage:
363                 sm.handleStatusMsg(basePeer, msg)
364
365         case *TransactionMessage:
366                 sm.handleTransactionMsg(peer, msg)
367
368         case *MineBlockMessage:
369                 sm.handleMineBlockMsg(peer, msg)
370
371         case *GetHeadersMessage:
372                 sm.handleGetHeadersMsg(peer, msg)
373
374         case *HeadersMessage:
375                 sm.handleHeadersMsg(peer, msg)
376
377         case *GetBlocksMessage:
378                 sm.handleGetBlocksMsg(peer, msg)
379
380         case *BlocksMessage:
381                 sm.handleBlocksMsg(peer, msg)
382
383         case *FilterLoadMessage:
384                 sm.handleFilterLoadMsg(peer, msg)
385
386         case *FilterAddMessage:
387                 sm.handleFilterAddMsg(peer, msg)
388
389         case *FilterClearMessage:
390                 sm.handleFilterClearMsg(peer)
391
392         case *GetMerkleBlockMessage:
393                 sm.handleGetMerkleBlockMsg(peer, msg)
394
395         default:
396                 log.WithFields(log.Fields{
397                         "module":       logModule,
398                         "peer":         basePeer.Addr(),
399                         "message_type": reflect.TypeOf(msg),
400                 }).Error("unhandled message type")
401         }
402 }
403
404 func (sm *SyncManager) SendStatus(peer BasePeer) error {
405         p := sm.peers.getPeer(peer.ID())
406         if p == nil {
407                 return errors.New("invalid peer")
408         }
409
410         if err := p.sendStatus(sm.chain.BestBlockHeader()); err != nil {
411                 sm.peers.removePeer(p.ID())
412                 return err
413         }
414         return nil
415 }
416
417 func (sm *SyncManager) Start() error {
418         var err error
419         if _, err = sm.sw.Start(); err != nil {
420                 log.Error("switch start err")
421                 return err
422         }
423
424         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
425         if err != nil {
426                 return err
427         }
428
429         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
430         if err != nil {
431                 return err
432         }
433
434         // broadcast transactions
435         go sm.txBroadcastLoop()
436         go sm.minedBroadcastLoop()
437         go sm.txSyncLoop()
438
439         return nil
440 }
441
442 //Stop stop sync manager
443 func (sm *SyncManager) Stop() {
444         close(sm.quitSync)
445         sm.minedBlockSub.Unsubscribe()
446         if !sm.config.VaultMode {
447                 sm.sw.Stop()
448         }
449 }
450
451 func (sm *SyncManager) minedBroadcastLoop() {
452         for {
453                 select {
454                 case obj, ok := <-sm.minedBlockSub.Chan():
455                         if !ok {
456                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
457                                 return
458                         }
459
460                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
461                         if !ok {
462                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
463                                 continue
464                         }
465
466                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
467                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
468                                 continue
469                         }
470
471                 case <-sm.quitSync:
472                         return
473                 }
474         }
475 }