OSDN Git Service

init delete the pow related (#55)
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/tendermint/go-crypto"
10         cfg "github.com/vapor/config"
11         "github.com/vapor/consensus"
12         "github.com/vapor/event"
13         "github.com/vapor/p2p"
14         core "github.com/vapor/protocol"
15         "github.com/vapor/protocol/bc"
16         "github.com/vapor/protocol/bc/types"
17 )
18
19 const (
20         logModule             = "netsync"
21         maxTxChanSize         = 10000
22         maxFilterAddressSize  = 50
23         maxFilterAddressCount = 1000
24 )
25
26 var (
27         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
28 )
29
30 // Chain is the interface for Bytom core
31 type Chain interface {
32         BestBlockHeader() *types.BlockHeader
33         BestBlockHeight() uint64
34         GetBlockByHash(*bc.Hash) (*types.Block, error)
35         GetBlockByHeight(uint64) (*types.Block, error)
36         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
37         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
38         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
39         InMainChain(bc.Hash) bool
40         ProcessBlock(*types.Block) (bool, error)
41         ValidateTx(*types.Tx) (bool, error)
42 }
43
44 type Switch interface {
45         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
46         AddBannedPeer(string) error
47         StopPeerGracefully(string)
48         NodeInfo() *p2p.NodeInfo
49         Start() (bool, error)
50         Stop() bool
51         IsListening() bool
52         DialPeerWithAddress(addr *p2p.NetAddress) error
53         Peers() *p2p.PeerSet
54 }
55
56 //SyncManager Sync Manager is responsible for the business layer information synchronization
57 type SyncManager struct {
58         sw           Switch
59         genesisHash  bc.Hash
60         chain        Chain
61         txPool       *core.TxPool
62         blockFetcher *blockFetcher
63         blockKeeper  *blockKeeper
64         peers        *peerSet
65
66         txSyncCh chan *txSyncMsg
67         quitSync chan struct{}
68         config   *cfg.Config
69
70         eventDispatcher *event.Dispatcher
71         minedBlockSub   *event.Subscription
72         txMsgSub        *event.Subscription
73 }
74
75 // CreateSyncManager create sync manager and set switch.
76 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
77         sw, err := p2p.NewSwitch(config)
78         if err != nil {
79                 return nil, err
80         }
81
82         return newSyncManager(config, sw, chain, txPool, dispatcher)
83 }
84
85 //NewSyncManager create a sync manager
86 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
87         genesisHeader, err := chain.GetHeaderByHeight(0)
88         if err != nil {
89                 return nil, err
90         }
91         peers := newPeerSet(sw)
92         manager := &SyncManager{
93                 sw:              sw,
94                 genesisHash:     genesisHeader.Hash(),
95                 txPool:          txPool,
96                 chain:           chain,
97                 blockFetcher:    newBlockFetcher(chain, peers),
98                 blockKeeper:     newBlockKeeper(chain, peers),
99                 peers:           peers,
100                 txSyncCh:        make(chan *txSyncMsg),
101                 quitSync:        make(chan struct{}),
102                 config:          config,
103                 eventDispatcher: dispatcher,
104         }
105
106         if !config.VaultMode {
107                 protocolReactor := NewProtocolReactor(manager, peers)
108                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
109         }
110         return manager, nil
111 }
112
113 //BestPeer return the highest p2p peerInfo
114 func (sm *SyncManager) BestPeer() *PeerInfo {
115         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
116         if bestPeer != nil {
117                 return bestPeer.getPeerInfo()
118         }
119         return nil
120 }
121
122 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
123         if sm.config.VaultMode {
124                 return errVaultModeDialPeer
125         }
126
127         return sm.sw.DialPeerWithAddress(addr)
128 }
129
130 func (sm *SyncManager) GetNetwork() string {
131         return sm.config.ChainID
132 }
133
134 //GetPeerInfos return peer info of all peers
135 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
136         return sm.peers.getPeerInfos()
137 }
138
139 //IsCaughtUp check wheather the peer finish the sync
140 func (sm *SyncManager) IsCaughtUp() bool {
141         peer := sm.peers.bestPeer(consensus.SFFullNode)
142         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
143 }
144
145 //StopPeer try to stop peer by given ID
146 func (sm *SyncManager) StopPeer(peerID string) error {
147         if peer := sm.peers.getPeer(peerID); peer == nil {
148                 return errors.New("peerId not exist")
149         }
150         sm.peers.removePeer(peerID)
151         return nil
152 }
153
154 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
155         block, err := msg.GetBlock()
156         if err != nil {
157                 return
158         }
159         sm.blockKeeper.processBlock(peer.ID(), block)
160 }
161
162 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
163         blocks, err := msg.GetBlocks()
164         if err != nil {
165                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
166                 return
167         }
168
169         sm.blockKeeper.processBlocks(peer.ID(), blocks)
170 }
171
172 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
173         peer.addFilterAddress(msg.Address)
174 }
175
176 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
177         peer.filterAdds.Clear()
178 }
179
180 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
181         peer.addFilterAddresses(msg.Addresses)
182 }
183
184 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
185         var block *types.Block
186         var err error
187         if msg.Height != 0 {
188                 block, err = sm.chain.GetBlockByHeight(msg.Height)
189         } else {
190                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
191         }
192         if err != nil {
193                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
194                 return
195         }
196
197         ok, err := peer.sendBlock(block)
198         if !ok {
199                 sm.peers.removePeer(peer.ID())
200         }
201         if err != nil {
202                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
203         }
204 }
205
206 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
207         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
208         if err != nil || len(blocks) == 0 {
209                 return
210         }
211
212         totalSize := 0
213         sendBlocks := []*types.Block{}
214         for _, block := range blocks {
215                 rawData, err := block.MarshalText()
216                 if err != nil {
217                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
218                         continue
219                 }
220
221                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
222                         break
223                 }
224                 totalSize += len(rawData)
225                 sendBlocks = append(sendBlocks, block)
226         }
227
228         ok, err := peer.sendBlocks(sendBlocks)
229         if !ok {
230                 sm.peers.removePeer(peer.ID())
231         }
232         if err != nil {
233                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
234         }
235 }
236
237 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
238         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
239         if err != nil || len(headers) == 0 {
240                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
241                 return
242         }
243
244         ok, err := peer.sendHeaders(headers)
245         if !ok {
246                 sm.peers.removePeer(peer.ID())
247         }
248         if err != nil {
249                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
250         }
251 }
252
253 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
254         var err error
255         var block *types.Block
256         if msg.Height != 0 {
257                 block, err = sm.chain.GetBlockByHeight(msg.Height)
258         } else {
259                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
260         }
261         if err != nil {
262                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
263                 return
264         }
265
266         blockHash := block.Hash()
267         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
268         if err != nil {
269                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
270                 return
271         }
272
273         ok, err := peer.sendMerkleBlock(block, txStatus)
274         if err != nil {
275                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
276                 return
277         }
278
279         if !ok {
280                 sm.peers.removePeer(peer.ID())
281         }
282 }
283
284 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
285         headers, err := msg.GetHeaders()
286         if err != nil {
287                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
288                 return
289         }
290
291         sm.blockKeeper.processHeaders(peer.ID(), headers)
292 }
293
294 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
295         block, err := msg.GetMineBlock()
296         if err != nil {
297                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
298                 return
299         }
300
301         hash := block.Hash()
302         peer.markBlock(&hash)
303         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
304         peer.setStatus(block.Height, &hash)
305 }
306
307 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
308         bestHeader := sm.chain.BestBlockHeader()
309         genesisBlock, err := sm.chain.GetBlockByHeight(0)
310         if err != nil {
311                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
312         }
313
314         genesisHash := genesisBlock.Hash()
315         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
316         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
317                 sm.peers.removePeer(peer.ID())
318         }
319 }
320
321 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
322         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
323                 peer.setStatus(msg.Height, msg.GetHash())
324                 return
325         }
326
327         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
328                 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
329                 return
330         }
331
332         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
333 }
334
335 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
336         tx, err := msg.GetTransaction()
337         if err != nil {
338                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
339                 return
340         }
341
342         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
343                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
344         }
345 }
346
347 func (sm *SyncManager) IsListening() bool {
348         if sm.config.VaultMode {
349                 return false
350         }
351         return sm.sw.IsListening()
352 }
353
354 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
355         if sm.config.VaultMode {
356                 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
357         }
358         return sm.sw.NodeInfo()
359 }
360
361 func (sm *SyncManager) PeerCount() int {
362         if sm.config.VaultMode {
363                 return 0
364         }
365         return len(sm.sw.Peers().List())
366 }
367
368 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
369         peer := sm.peers.getPeer(basePeer.ID())
370         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
371                 return
372         }
373
374         log.WithFields(log.Fields{
375                 "module":  logModule,
376                 "peer":    basePeer.Addr(),
377                 "type":    reflect.TypeOf(msg),
378                 "message": msg.String(),
379         }).Info("receive message from peer")
380
381         switch msg := msg.(type) {
382         case *GetBlockMessage:
383                 sm.handleGetBlockMsg(peer, msg)
384
385         case *BlockMessage:
386                 sm.handleBlockMsg(peer, msg)
387
388         case *StatusRequestMessage:
389                 sm.handleStatusRequestMsg(basePeer)
390
391         case *StatusResponseMessage:
392                 sm.handleStatusResponseMsg(basePeer, msg)
393
394         case *TransactionMessage:
395                 sm.handleTransactionMsg(peer, msg)
396
397         case *MineBlockMessage:
398                 sm.handleMineBlockMsg(peer, msg)
399
400         case *GetHeadersMessage:
401                 sm.handleGetHeadersMsg(peer, msg)
402
403         case *HeadersMessage:
404                 sm.handleHeadersMsg(peer, msg)
405
406         case *GetBlocksMessage:
407                 sm.handleGetBlocksMsg(peer, msg)
408
409         case *BlocksMessage:
410                 sm.handleBlocksMsg(peer, msg)
411
412         case *FilterLoadMessage:
413                 sm.handleFilterLoadMsg(peer, msg)
414
415         case *FilterAddMessage:
416                 sm.handleFilterAddMsg(peer, msg)
417
418         case *FilterClearMessage:
419                 sm.handleFilterClearMsg(peer)
420
421         case *GetMerkleBlockMessage:
422                 sm.handleGetMerkleBlockMsg(peer, msg)
423
424         default:
425                 log.WithFields(log.Fields{
426                         "module":       logModule,
427                         "peer":         basePeer.Addr(),
428                         "message_type": reflect.TypeOf(msg),
429                 }).Error("unhandled message type")
430         }
431 }
432
433 func (sm *SyncManager) Start() error {
434         var err error
435         if _, err = sm.sw.Start(); err != nil {
436                 log.Error("switch start err")
437                 return err
438         }
439
440         sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
441         if err != nil {
442                 return err
443         }
444
445         sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
446         if err != nil {
447                 return err
448         }
449
450         // broadcast transactions
451         go sm.txBroadcastLoop()
452         go sm.minedBroadcastLoop()
453         go sm.txSyncLoop()
454
455         return nil
456 }
457
458 //Stop stop sync manager
459 func (sm *SyncManager) Stop() {
460         close(sm.quitSync)
461         sm.minedBlockSub.Unsubscribe()
462         if !sm.config.VaultMode {
463                 sm.sw.Stop()
464         }
465 }
466
467 func (sm *SyncManager) minedBroadcastLoop() {
468         for {
469                 select {
470                 case obj, ok := <-sm.minedBlockSub.Chan():
471                         if !ok {
472                                 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
473                                 return
474                         }
475
476                         ev, ok := obj.Data.(event.NewMinedBlockEvent)
477                         if !ok {
478                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
479                                 continue
480                         }
481
482                         if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
483                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
484                                 continue
485                         }
486
487                 case <-sm.quitSync:
488                         return
489                 }
490         }
491 }