7 log "github.com/sirupsen/logrus"
9 cfg "github.com/bytom/config"
10 "github.com/bytom/consensus"
11 "github.com/bytom/event"
12 "github.com/bytom/p2p"
13 "github.com/bytom/p2p/security"
14 core "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/protocol/bc/types"
17 "github.com/tendermint/go-crypto"
23 maxFilterAddressSize = 50
24 maxFilterAddressCount = 1000
28 errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
31 // Chain is the interface for Bytom core
32 type Chain interface {
33 BestBlockHeader() *types.BlockHeader
34 BestBlockHeight() uint64
35 CalcNextSeed(*bc.Hash) (*bc.Hash, error)
36 GetBlockByHash(*bc.Hash) (*types.Block, error)
37 GetBlockByHeight(uint64) (*types.Block, error)
38 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
39 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
40 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
41 InMainChain(bc.Hash) bool
42 ProcessBlock(*types.Block) (bool, error)
43 ValidateTx(*types.Tx) (bool, error)
46 type Switch interface {
47 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
48 StopPeerGracefully(string)
49 NodeInfo() *p2p.NodeInfo
53 DialPeerWithAddress(addr *p2p.NetAddress) error
55 IsBanned(peerID string, level byte, reason string) bool
58 //SyncManager Sync Manager is responsible for the business layer information synchronization
59 type SyncManager struct {
64 blockFetcher *blockFetcher
65 blockKeeper *blockKeeper
68 txSyncCh chan *txSyncMsg
69 quitSync chan struct{}
72 eventDispatcher *event.Dispatcher
73 minedBlockSub *event.Subscription
74 txMsgSub *event.Subscription
77 // CreateSyncManager create sync manager and set switch.
78 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
79 sw, err := p2p.NewSwitch(config)
84 return newSyncManager(config, sw, chain, txPool, dispatcher)
87 //NewSyncManager create a sync manager
88 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
89 genesisHeader, err := chain.GetHeaderByHeight(0)
93 peers := newPeerSet(sw)
94 manager := &SyncManager{
96 genesisHash: genesisHeader.Hash(),
99 blockFetcher: newBlockFetcher(chain, peers),
100 blockKeeper: newBlockKeeper(chain, peers),
102 txSyncCh: make(chan *txSyncMsg),
103 quitSync: make(chan struct{}),
105 eventDispatcher: dispatcher,
108 if !config.VaultMode {
109 protocolReactor := NewProtocolReactor(manager, peers)
110 manager.sw.AddReactor("PROTOCOL", protocolReactor)
115 //BestPeer return the highest p2p peerInfo
116 func (sm *SyncManager) BestPeer() *PeerInfo {
117 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
119 return bestPeer.getPeerInfo()
124 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
125 if sm.config.VaultMode {
126 return errVaultModeDialPeer
129 return sm.sw.DialPeerWithAddress(addr)
132 func (sm *SyncManager) GetNetwork() string {
133 return sm.config.ChainID
136 //GetPeerInfos return peer info of all peers
137 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
138 return sm.peers.getPeerInfos()
141 //IsCaughtUp check wheather the peer finish the sync
142 func (sm *SyncManager) IsCaughtUp() bool {
143 peer := sm.peers.bestPeer(consensus.SFFullNode)
144 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
147 //StopPeer try to stop peer by given ID
148 func (sm *SyncManager) StopPeer(peerID string) error {
149 if peer := sm.peers.getPeer(peerID); peer == nil {
150 return errors.New("peerId not exist")
152 sm.peers.removePeer(peerID)
156 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
157 block, err := msg.GetBlock()
161 sm.blockKeeper.processBlock(peer.ID(), block)
164 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
165 blocks, err := msg.GetBlocks()
167 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
171 sm.blockKeeper.processBlocks(peer.ID(), blocks)
174 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
175 peer.addFilterAddress(msg.Address)
178 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
179 peer.filterAdds.Clear()
182 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
183 peer.addFilterAddresses(msg.Addresses)
186 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
187 var block *types.Block
190 block, err = sm.chain.GetBlockByHeight(msg.Height)
192 block, err = sm.chain.GetBlockByHash(msg.GetHash())
195 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
199 ok, err := peer.sendBlock(block)
201 sm.peers.removePeer(peer.ID())
204 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
208 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
209 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
210 if err != nil || len(blocks) == 0 {
215 sendBlocks := []*types.Block{}
216 for _, block := range blocks {
217 rawData, err := block.MarshalText()
219 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
223 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
226 totalSize += len(rawData)
227 sendBlocks = append(sendBlocks, block)
230 ok, err := peer.sendBlocks(sendBlocks)
232 sm.peers.removePeer(peer.ID())
235 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
239 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
240 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
241 if err != nil || len(headers) == 0 {
242 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
246 ok, err := peer.sendHeaders(headers)
248 sm.peers.removePeer(peer.ID())
251 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
255 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
257 var block *types.Block
259 block, err = sm.chain.GetBlockByHeight(msg.Height)
261 block, err = sm.chain.GetBlockByHash(msg.GetHash())
264 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
268 blockHash := block.Hash()
269 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
271 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
275 ok, err := peer.sendMerkleBlock(block, txStatus)
277 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
282 sm.peers.removePeer(peer.ID())
286 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
287 headers, err := msg.GetHeaders()
289 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
293 sm.blockKeeper.processHeaders(peer.ID(), headers)
296 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
297 block, err := msg.GetMineBlock()
299 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
304 peer.markBlock(&hash)
305 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
306 peer.setStatus(block.Height, &hash)
309 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
310 bestHeader := sm.chain.BestBlockHeader()
311 genesisBlock, err := sm.chain.GetBlockByHeight(0)
313 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
316 genesisHash := genesisBlock.Hash()
317 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
318 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
319 sm.peers.removePeer(peer.ID())
323 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
324 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
325 peer.setStatus(msg.Height, msg.GetHash())
329 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
330 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
334 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
337 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
338 tx, err := msg.GetTransaction()
340 sm.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
344 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
345 sm.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
349 func (sm *SyncManager) IsListening() bool {
350 if sm.config.VaultMode {
353 return sm.sw.IsListening()
356 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
357 if sm.config.VaultMode {
358 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
360 return sm.sw.NodeInfo()
363 func (sm *SyncManager) PeerCount() int {
364 if sm.config.VaultMode {
367 return len(sm.sw.Peers().List())
370 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
371 peer := sm.peers.getPeer(basePeer.ID())
372 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
376 log.WithFields(log.Fields{
378 "peer": basePeer.Addr(),
379 "type": reflect.TypeOf(msg),
380 "message": msg.String(),
381 }).Info("receive message from peer")
383 switch msg := msg.(type) {
384 case *GetBlockMessage:
385 sm.handleGetBlockMsg(peer, msg)
388 sm.handleBlockMsg(peer, msg)
390 case *StatusRequestMessage:
391 sm.handleStatusRequestMsg(basePeer)
393 case *StatusResponseMessage:
394 sm.handleStatusResponseMsg(basePeer, msg)
396 case *TransactionMessage:
397 sm.handleTransactionMsg(peer, msg)
399 case *MineBlockMessage:
400 sm.handleMineBlockMsg(peer, msg)
402 case *GetHeadersMessage:
403 sm.handleGetHeadersMsg(peer, msg)
405 case *HeadersMessage:
406 sm.handleHeadersMsg(peer, msg)
408 case *GetBlocksMessage:
409 sm.handleGetBlocksMsg(peer, msg)
412 sm.handleBlocksMsg(peer, msg)
414 case *FilterLoadMessage:
415 sm.handleFilterLoadMsg(peer, msg)
417 case *FilterAddMessage:
418 sm.handleFilterAddMsg(peer, msg)
420 case *FilterClearMessage:
421 sm.handleFilterClearMsg(peer)
423 case *GetMerkleBlockMessage:
424 sm.handleGetMerkleBlockMsg(peer, msg)
427 log.WithFields(log.Fields{
429 "peer": basePeer.Addr(),
430 "message_type": reflect.TypeOf(msg),
431 }).Error("unhandled message type")
435 func (sm *SyncManager) Start() error {
437 if _, err = sm.sw.Start(); err != nil {
438 log.Error("switch start err")
442 sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
447 sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
452 // broadcast transactions
453 go sm.txBroadcastLoop()
454 go sm.minedBroadcastLoop()
460 //Stop stop sync manager
461 func (sm *SyncManager) Stop() {
463 sm.minedBlockSub.Unsubscribe()
464 if !sm.config.VaultMode {
469 func (sm *SyncManager) minedBroadcastLoop() {
472 case obj, ok := <-sm.minedBlockSub.Chan():
474 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
478 ev, ok := obj.Data.(event.NewMinedBlockEvent)
480 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
484 if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
485 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")