7 log "github.com/sirupsen/logrus"
9 cfg "github.com/vapor/config"
10 "github.com/vapor/consensus"
11 "github.com/vapor/event"
12 "github.com/vapor/p2p"
13 core "github.com/vapor/protocol"
14 "github.com/vapor/protocol/bc"
15 "github.com/vapor/protocol/bc/types"
21 maxFilterAddressSize = 50
22 maxFilterAddressCount = 1000
26 errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
29 // Chain is the interface for Bytom core
30 type Chain interface {
31 BestBlockHeader() *types.BlockHeader
32 BestBlockHeight() uint64
33 GetBlockByHash(*bc.Hash) (*types.Block, error)
34 GetBlockByHeight(uint64) (*types.Block, error)
35 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
36 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
37 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
38 InMainChain(bc.Hash) bool
39 ProcessBlock(*types.Block) (bool, error)
40 ValidateTx(*types.Tx) (bool, error)
43 type Switch interface {
44 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45 AddBannedPeer(string) error
46 StopPeerGracefully(string)
47 NodeInfo() *p2p.NodeInfo
51 DialPeerWithAddress(addr *p2p.NetAddress) error
55 //SyncManager Sync Manager is responsible for the business layer information synchronization
56 type SyncManager struct {
61 blockFetcher *blockFetcher
62 blockKeeper *blockKeeper
65 txSyncCh chan *txSyncMsg
66 quitSync chan struct{}
69 eventDispatcher *event.Dispatcher
70 minedBlockSub *event.Subscription
71 txMsgSub *event.Subscription
74 // CreateSyncManager create sync manager and set switch.
75 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
76 sw, err := p2p.NewSwitch(config)
81 return newSyncManager(config, sw, chain, txPool, dispatcher)
84 //NewSyncManager create a sync manager
85 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
86 genesisHeader, err := chain.GetHeaderByHeight(0)
90 peers := newPeerSet(sw)
91 manager := &SyncManager{
93 genesisHash: genesisHeader.Hash(),
96 blockFetcher: newBlockFetcher(chain, peers),
97 blockKeeper: newBlockKeeper(chain, peers),
99 txSyncCh: make(chan *txSyncMsg),
100 quitSync: make(chan struct{}),
102 eventDispatcher: dispatcher,
105 if !config.VaultMode {
106 protocolReactor := NewProtocolReactor(manager, peers)
107 manager.sw.AddReactor("PROTOCOL", protocolReactor)
112 //BestPeer return the highest p2p peerInfo
113 func (sm *SyncManager) BestPeer() *PeerInfo {
114 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
116 return bestPeer.getPeerInfo()
121 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
122 if sm.config.VaultMode {
123 return errVaultModeDialPeer
126 return sm.sw.DialPeerWithAddress(addr)
129 func (sm *SyncManager) GetNetwork() string {
130 return sm.config.ChainID
133 //GetPeerInfos return peer info of all peers
134 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
135 return sm.peers.getPeerInfos()
138 //IsCaughtUp check wheather the peer finish the sync
139 func (sm *SyncManager) IsCaughtUp() bool {
140 peer := sm.peers.bestPeer(consensus.SFFullNode)
141 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
144 //StopPeer try to stop peer by given ID
145 func (sm *SyncManager) StopPeer(peerID string) error {
146 if peer := sm.peers.getPeer(peerID); peer == nil {
147 return errors.New("peerId not exist")
149 sm.peers.removePeer(peerID)
153 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
154 block, err := msg.GetBlock()
158 sm.blockKeeper.processBlock(peer.ID(), block)
161 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
162 blocks, err := msg.GetBlocks()
164 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
168 sm.blockKeeper.processBlocks(peer.ID(), blocks)
171 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
172 peer.addFilterAddress(msg.Address)
175 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
176 peer.filterAdds.Clear()
179 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
180 peer.addFilterAddresses(msg.Addresses)
183 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
184 var block *types.Block
187 block, err = sm.chain.GetBlockByHeight(msg.Height)
189 block, err = sm.chain.GetBlockByHash(msg.GetHash())
192 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
196 ok, err := peer.sendBlock(block)
198 sm.peers.removePeer(peer.ID())
201 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
205 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
206 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
207 if err != nil || len(blocks) == 0 {
212 sendBlocks := []*types.Block{}
213 for _, block := range blocks {
214 rawData, err := block.MarshalText()
216 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
220 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
223 totalSize += len(rawData)
224 sendBlocks = append(sendBlocks, block)
227 ok, err := peer.sendBlocks(sendBlocks)
229 sm.peers.removePeer(peer.ID())
232 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
236 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
237 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
238 if err != nil || len(headers) == 0 {
239 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
243 ok, err := peer.sendHeaders(headers)
245 sm.peers.removePeer(peer.ID())
248 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
252 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
254 var block *types.Block
256 block, err = sm.chain.GetBlockByHeight(msg.Height)
258 block, err = sm.chain.GetBlockByHash(msg.GetHash())
261 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
265 blockHash := block.Hash()
266 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
268 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
272 ok, err := peer.sendMerkleBlock(block, txStatus)
274 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
279 sm.peers.removePeer(peer.ID())
283 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
284 headers, err := msg.GetHeaders()
286 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
290 sm.blockKeeper.processHeaders(peer.ID(), headers)
293 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
294 block, err := msg.GetMineBlock()
296 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
301 peer.markBlock(&hash)
302 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
303 peer.setStatus(block.Height, &hash)
306 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
307 bestHeader := sm.chain.BestBlockHeader()
308 genesisBlock, err := sm.chain.GetBlockByHeight(0)
310 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
313 genesisHash := genesisBlock.Hash()
314 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
315 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
316 sm.peers.removePeer(peer.ID())
320 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
321 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
322 peer.setStatus(msg.Height, msg.GetHash())
326 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
327 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
331 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
334 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
335 tx, err := msg.GetTransaction()
337 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
341 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
342 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
346 func (sm *SyncManager) IsListening() bool {
347 if sm.config.VaultMode {
350 return sm.sw.IsListening()
353 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
354 return sm.sw.NodeInfo()
357 func (sm *SyncManager) PeerCount() int {
358 if sm.config.VaultMode {
361 return len(sm.sw.Peers().List())
364 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
365 peer := sm.peers.getPeer(basePeer.ID())
366 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
370 log.WithFields(log.Fields{
372 "peer": basePeer.Addr(),
373 "type": reflect.TypeOf(msg),
374 "message": msg.String(),
375 }).Info("receive message from peer")
377 switch msg := msg.(type) {
378 case *GetBlockMessage:
379 sm.handleGetBlockMsg(peer, msg)
382 sm.handleBlockMsg(peer, msg)
384 case *StatusRequestMessage:
385 sm.handleStatusRequestMsg(basePeer)
387 case *StatusResponseMessage:
388 sm.handleStatusResponseMsg(basePeer, msg)
390 case *TransactionMessage:
391 sm.handleTransactionMsg(peer, msg)
393 case *MineBlockMessage:
394 sm.handleMineBlockMsg(peer, msg)
396 case *GetHeadersMessage:
397 sm.handleGetHeadersMsg(peer, msg)
399 case *HeadersMessage:
400 sm.handleHeadersMsg(peer, msg)
402 case *GetBlocksMessage:
403 sm.handleGetBlocksMsg(peer, msg)
406 sm.handleBlocksMsg(peer, msg)
408 case *FilterLoadMessage:
409 sm.handleFilterLoadMsg(peer, msg)
411 case *FilterAddMessage:
412 sm.handleFilterAddMsg(peer, msg)
414 case *FilterClearMessage:
415 sm.handleFilterClearMsg(peer)
417 case *GetMerkleBlockMessage:
418 sm.handleGetMerkleBlockMsg(peer, msg)
421 log.WithFields(log.Fields{
423 "peer": basePeer.Addr(),
424 "message_type": reflect.TypeOf(msg),
425 }).Error("unhandled message type")
429 func (sm *SyncManager) Start() error {
431 if _, err = sm.sw.Start(); err != nil {
432 log.Error("switch start err")
436 sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
441 sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
446 // broadcast transactions
447 go sm.txBroadcastLoop()
448 go sm.minedBroadcastLoop()
454 //Stop stop sync manager
455 func (sm *SyncManager) Stop() {
457 sm.minedBlockSub.Unsubscribe()
458 if !sm.config.VaultMode {
463 func (sm *SyncManager) minedBroadcastLoop() {
466 case obj, ok := <-sm.minedBlockSub.Chan():
468 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
472 ev, ok := obj.Data.(event.NewMinedBlockEvent)
474 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
478 if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
479 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")