7 log "github.com/sirupsen/logrus"
9 "github.com/tendermint/go-crypto"
10 cfg "github.com/vapor/config"
11 "github.com/vapor/consensus"
12 "github.com/vapor/event"
13 "github.com/vapor/p2p"
14 core "github.com/vapor/protocol"
15 "github.com/vapor/protocol/bc"
16 "github.com/vapor/protocol/bc/types"
22 maxFilterAddressSize = 50
23 maxFilterAddressCount = 1000
27 errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
30 // Chain is the interface for Bytom core
31 type Chain interface {
32 BestBlockHeader() *types.BlockHeader
33 BestBlockHeight() uint64
34 GetBlockByHash(*bc.Hash) (*types.Block, error)
35 GetBlockByHeight(uint64) (*types.Block, error)
36 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
37 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
38 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
39 InMainChain(bc.Hash) bool
40 ProcessBlock(*types.Block) (bool, error)
41 ValidateTx(*types.Tx) (bool, error)
44 type Switch interface {
45 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
46 AddBannedPeer(string) error
47 StopPeerGracefully(string)
48 NodeInfo() *p2p.NodeInfo
52 DialPeerWithAddress(addr *p2p.NetAddress) error
56 //SyncManager Sync Manager is responsible for the business layer information synchronization
57 type SyncManager struct {
62 blockFetcher *blockFetcher
63 blockKeeper *blockKeeper
66 txSyncCh chan *txSyncMsg
67 quitSync chan struct{}
70 eventDispatcher *event.Dispatcher
71 minedBlockSub *event.Subscription
72 txMsgSub *event.Subscription
75 // CreateSyncManager create sync manager and set switch.
76 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
77 sw, err := p2p.NewSwitch(config)
82 return newSyncManager(config, sw, chain, txPool, dispatcher)
85 //NewSyncManager create a sync manager
86 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
87 genesisHeader, err := chain.GetHeaderByHeight(0)
91 peers := newPeerSet(sw)
92 manager := &SyncManager{
94 genesisHash: genesisHeader.Hash(),
97 blockFetcher: newBlockFetcher(chain, peers),
98 blockKeeper: newBlockKeeper(chain, peers),
100 txSyncCh: make(chan *txSyncMsg),
101 quitSync: make(chan struct{}),
103 eventDispatcher: dispatcher,
106 if !config.VaultMode {
107 protocolReactor := NewProtocolReactor(manager, peers)
108 manager.sw.AddReactor("PROTOCOL", protocolReactor)
113 //BestPeer return the highest p2p peerInfo
114 func (sm *SyncManager) BestPeer() *PeerInfo {
115 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
117 return bestPeer.getPeerInfo()
122 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
123 if sm.config.VaultMode {
124 return errVaultModeDialPeer
127 return sm.sw.DialPeerWithAddress(addr)
130 func (sm *SyncManager) GetNetwork() string {
131 return sm.config.ChainID
134 //GetPeerInfos return peer info of all peers
135 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
136 return sm.peers.getPeerInfos()
139 //IsCaughtUp check wheather the peer finish the sync
140 func (sm *SyncManager) IsCaughtUp() bool {
141 peer := sm.peers.bestPeer(consensus.SFFullNode)
142 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
145 //StopPeer try to stop peer by given ID
146 func (sm *SyncManager) StopPeer(peerID string) error {
147 if peer := sm.peers.getPeer(peerID); peer == nil {
148 return errors.New("peerId not exist")
150 sm.peers.removePeer(peerID)
154 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
155 block, err := msg.GetBlock()
159 sm.blockKeeper.processBlock(peer.ID(), block)
162 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
163 blocks, err := msg.GetBlocks()
165 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
169 sm.blockKeeper.processBlocks(peer.ID(), blocks)
172 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
173 peer.addFilterAddress(msg.Address)
176 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
177 peer.filterAdds.Clear()
180 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
181 peer.addFilterAddresses(msg.Addresses)
184 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
185 var block *types.Block
188 block, err = sm.chain.GetBlockByHeight(msg.Height)
190 block, err = sm.chain.GetBlockByHash(msg.GetHash())
193 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
197 ok, err := peer.sendBlock(block)
199 sm.peers.removePeer(peer.ID())
202 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
206 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
207 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
208 if err != nil || len(blocks) == 0 {
213 sendBlocks := []*types.Block{}
214 for _, block := range blocks {
215 rawData, err := block.MarshalText()
217 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
221 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
224 totalSize += len(rawData)
225 sendBlocks = append(sendBlocks, block)
228 ok, err := peer.sendBlocks(sendBlocks)
230 sm.peers.removePeer(peer.ID())
233 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
237 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
238 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
239 if err != nil || len(headers) == 0 {
240 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
244 ok, err := peer.sendHeaders(headers)
246 sm.peers.removePeer(peer.ID())
249 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
253 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
255 var block *types.Block
257 block, err = sm.chain.GetBlockByHeight(msg.Height)
259 block, err = sm.chain.GetBlockByHash(msg.GetHash())
262 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
266 blockHash := block.Hash()
267 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
269 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
273 ok, err := peer.sendMerkleBlock(block, txStatus)
275 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
280 sm.peers.removePeer(peer.ID())
284 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
285 headers, err := msg.GetHeaders()
287 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
291 sm.blockKeeper.processHeaders(peer.ID(), headers)
294 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
295 block, err := msg.GetMineBlock()
297 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
302 peer.markBlock(&hash)
303 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
304 peer.setStatus(block.Height, &hash)
307 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
308 bestHeader := sm.chain.BestBlockHeader()
309 genesisBlock, err := sm.chain.GetBlockByHeight(0)
311 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
314 genesisHash := genesisBlock.Hash()
315 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
316 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
317 sm.peers.removePeer(peer.ID())
321 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
322 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
323 peer.setStatus(msg.Height, msg.GetHash())
327 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
328 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
332 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
335 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
336 tx, err := msg.GetTransaction()
338 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
342 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
343 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
347 func (sm *SyncManager) IsListening() bool {
348 if sm.config.VaultMode {
351 return sm.sw.IsListening()
354 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
355 if sm.config.VaultMode {
356 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
358 return sm.sw.NodeInfo()
361 func (sm *SyncManager) PeerCount() int {
362 if sm.config.VaultMode {
365 return len(sm.sw.Peers().List())
368 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
369 peer := sm.peers.getPeer(basePeer.ID())
370 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
374 log.WithFields(log.Fields{
376 "peer": basePeer.Addr(),
377 "type": reflect.TypeOf(msg),
378 "message": msg.String(),
379 }).Info("receive message from peer")
381 switch msg := msg.(type) {
382 case *GetBlockMessage:
383 sm.handleGetBlockMsg(peer, msg)
386 sm.handleBlockMsg(peer, msg)
388 case *StatusRequestMessage:
389 sm.handleStatusRequestMsg(basePeer)
391 case *StatusResponseMessage:
392 sm.handleStatusResponseMsg(basePeer, msg)
394 case *TransactionMessage:
395 sm.handleTransactionMsg(peer, msg)
397 case *MineBlockMessage:
398 sm.handleMineBlockMsg(peer, msg)
400 case *GetHeadersMessage:
401 sm.handleGetHeadersMsg(peer, msg)
403 case *HeadersMessage:
404 sm.handleHeadersMsg(peer, msg)
406 case *GetBlocksMessage:
407 sm.handleGetBlocksMsg(peer, msg)
410 sm.handleBlocksMsg(peer, msg)
412 case *FilterLoadMessage:
413 sm.handleFilterLoadMsg(peer, msg)
415 case *FilterAddMessage:
416 sm.handleFilterAddMsg(peer, msg)
418 case *FilterClearMessage:
419 sm.handleFilterClearMsg(peer)
421 case *GetMerkleBlockMessage:
422 sm.handleGetMerkleBlockMsg(peer, msg)
425 log.WithFields(log.Fields{
427 "peer": basePeer.Addr(),
428 "message_type": reflect.TypeOf(msg),
429 }).Error("unhandled message type")
433 func (sm *SyncManager) Start() error {
435 if _, err = sm.sw.Start(); err != nil {
436 log.Error("switch start err")
440 sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
445 sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
450 // broadcast transactions
451 go sm.txBroadcastLoop()
452 go sm.minedBroadcastLoop()
458 //Stop stop sync manager
459 func (sm *SyncManager) Stop() {
461 sm.minedBlockSub.Unsubscribe()
462 if !sm.config.VaultMode {
467 func (sm *SyncManager) minedBroadcastLoop() {
470 case obj, ok := <-sm.minedBlockSub.Chan():
472 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
476 ev, ok := obj.Data.(event.NewMinedBlockEvent)
478 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
482 if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
483 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")