12 log "github.com/sirupsen/logrus"
13 "github.com/tendermint/go-crypto"
14 cmn "github.com/tendermint/tmlibs/common"
16 cfg "github.com/bytom/config"
17 "github.com/bytom/consensus"
18 "github.com/bytom/p2p"
19 "github.com/bytom/p2p/discover"
20 core "github.com/bytom/protocol"
21 "github.com/bytom/protocol/bc"
22 "github.com/bytom/protocol/bc/types"
23 "github.com/bytom/version"
29 maxFilterAddressSize = 50
30 maxFilterAddressCount = 1000
33 // Chain is the interface for Bytom core
34 type Chain interface {
35 BestBlockHeader() *types.BlockHeader
36 BestBlockHeight() uint64
37 CalcNextSeed(*bc.Hash) (*bc.Hash, error)
38 GetBlockByHash(*bc.Hash) (*types.Block, error)
39 GetBlockByHeight(uint64) (*types.Block, error)
40 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
41 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
42 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
43 InMainChain(bc.Hash) bool
44 ProcessBlock(*types.Block) (bool, error)
45 ValidateTx(*types.Tx) (bool, error)
48 //SyncManager Sync Manager is responsible for the business layer information synchronization
49 type SyncManager struct {
53 privKey crypto.PrivKeyEd25519 // local node's p2p key
56 blockFetcher *blockFetcher
57 blockKeeper *blockKeeper
60 newTxCh chan *types.Tx
61 newBlockCh chan *bc.Hash
62 txSyncCh chan *txSyncMsg
63 quitSync chan struct{}
67 //NewSyncManager create a sync manager
68 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
69 genesisHeader, err := chain.GetHeaderByHeight(0)
74 sw := p2p.NewSwitch(config)
75 peers := newPeerSet(sw)
76 manager := &SyncManager{
78 genesisHash: genesisHeader.Hash(),
81 privKey: crypto.GenPrivKeyEd25519(),
82 blockFetcher: newBlockFetcher(chain, peers),
83 blockKeeper: newBlockKeeper(chain, peers),
85 newTxCh: make(chan *types.Tx, maxTxChanSize),
86 newBlockCh: newBlockCh,
87 txSyncCh: make(chan *txSyncMsg),
88 quitSync: make(chan struct{}),
92 protocolReactor := NewProtocolReactor(manager, manager.peers)
93 manager.sw.AddReactor("PROTOCOL", protocolReactor)
95 // Create & add listener
96 var listenerStatus bool
98 if !config.VaultMode {
99 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
100 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
101 manager.sw.AddListener(l)
103 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
107 manager.sw.SetDiscv(discv)
109 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
110 manager.sw.SetNodePrivKey(manager.privKey)
114 //BestPeer return the highest p2p peerInfo
115 func (sm *SyncManager) BestPeer() *PeerInfo {
116 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
118 return bestPeer.getPeerInfo()
123 // GetNewTxCh return a unconfirmed transaction feed channel
124 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
128 //GetPeerInfos return peer info of all peers
129 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
130 return sm.peers.getPeerInfos()
133 //IsCaughtUp check wheather the peer finish the sync
134 func (sm *SyncManager) IsCaughtUp() bool {
135 peer := sm.peers.bestPeer(consensus.SFFullNode)
136 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
139 //NodeInfo get P2P peer node info
140 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
141 return sm.sw.NodeInfo()
144 //StopPeer try to stop peer by given ID
145 func (sm *SyncManager) StopPeer(peerID string) error {
146 if peer := sm.peers.getPeer(peerID); peer == nil {
147 return errors.New("peerId not exist")
149 sm.peers.removePeer(peerID)
153 //Switch get sync manager switch
154 func (sm *SyncManager) Switch() *p2p.Switch {
158 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
159 block, err := msg.GetBlock()
163 sm.blockKeeper.processBlock(peer.ID(), block)
166 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
167 blocks, err := msg.GetBlocks()
169 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
173 sm.blockKeeper.processBlocks(peer.ID(), blocks)
176 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
177 peer.addFilterAddress(msg.Address)
180 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
181 peer.filterAdds.Clear()
184 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
185 peer.addFilterAddresses(msg.Addresses)
188 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
189 var block *types.Block
192 block, err = sm.chain.GetBlockByHeight(msg.Height)
194 block, err = sm.chain.GetBlockByHash(msg.GetHash())
197 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
201 ok, err := peer.sendBlock(block)
203 sm.peers.removePeer(peer.ID())
206 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
210 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
211 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
212 if err != nil || len(blocks) == 0 {
217 sendBlocks := []*types.Block{}
218 for _, block := range blocks {
219 rawData, err := block.MarshalText()
221 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
225 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
228 totalSize += len(rawData)
229 sendBlocks = append(sendBlocks, block)
232 ok, err := peer.sendBlocks(sendBlocks)
234 sm.peers.removePeer(peer.ID())
237 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
241 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
242 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
243 if err != nil || len(headers) == 0 {
244 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
248 ok, err := peer.sendHeaders(headers)
250 sm.peers.removePeer(peer.ID())
253 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
257 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
259 var block *types.Block
261 block, err = sm.chain.GetBlockByHeight(msg.Height)
263 block, err = sm.chain.GetBlockByHash(msg.GetHash())
266 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
270 blockHash := block.Hash()
271 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
273 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
277 ok, err := peer.sendMerkleBlock(block, txStatus)
279 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
284 sm.peers.removePeer(peer.ID())
288 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
289 headers, err := msg.GetHeaders()
291 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
295 sm.blockKeeper.processHeaders(peer.ID(), headers)
298 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
299 block, err := msg.GetMineBlock()
301 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
306 peer.markBlock(&hash)
307 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
308 peer.setStatus(block.Height, &hash)
311 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
312 bestHeader := sm.chain.BestBlockHeader()
313 genesisBlock, err := sm.chain.GetBlockByHeight(0)
315 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
318 genesisHash := genesisBlock.Hash()
319 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
320 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
321 sm.peers.removePeer(peer.ID())
325 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
326 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
327 peer.setStatus(msg.Height, msg.GetHash())
331 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
332 log.WithFields(log.Fields{
334 "remote genesis": genesisHash.String(),
335 "local genesis": sm.genesisHash.String(),
336 }).Warn("fail hand shake due to differnt genesis")
340 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
343 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
344 tx, err := msg.GetTransaction()
346 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
350 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
351 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
355 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
356 peer := sm.peers.getPeer(basePeer.ID())
357 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
361 log.WithFields(log.Fields{
363 "peer": basePeer.Addr(),
364 "type": reflect.TypeOf(msg),
365 "message": msg.String(),
366 }).Info("receive message from peer")
368 switch msg := msg.(type) {
369 case *GetBlockMessage:
370 sm.handleGetBlockMsg(peer, msg)
373 sm.handleBlockMsg(peer, msg)
375 case *StatusRequestMessage:
376 sm.handleStatusRequestMsg(basePeer)
378 case *StatusResponseMessage:
379 sm.handleStatusResponseMsg(basePeer, msg)
381 case *TransactionMessage:
382 sm.handleTransactionMsg(peer, msg)
384 case *MineBlockMessage:
385 sm.handleMineBlockMsg(peer, msg)
387 case *GetHeadersMessage:
388 sm.handleGetHeadersMsg(peer, msg)
390 case *HeadersMessage:
391 sm.handleHeadersMsg(peer, msg)
393 case *GetBlocksMessage:
394 sm.handleGetBlocksMsg(peer, msg)
397 sm.handleBlocksMsg(peer, msg)
399 case *FilterLoadMessage:
400 sm.handleFilterLoadMsg(peer, msg)
402 case *FilterAddMessage:
403 sm.handleFilterAddMsg(peer, msg)
405 case *FilterClearMessage:
406 sm.handleFilterClearMsg(peer)
408 case *GetMerkleBlockMessage:
409 sm.handleGetMerkleBlockMsg(peer, msg)
412 log.WithFields(log.Fields{
414 "peer": basePeer.Addr(),
415 "message_type": reflect.TypeOf(msg),
416 }).Error("unhandled message type")
421 func protocolAndAddress(listenAddr string) (string, string) {
422 p, address := "tcp", listenAddr
423 parts := strings.SplitN(address, "://", 2)
425 p, address = parts[0], parts[1]
430 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
431 nodeInfo := &p2p.NodeInfo{
432 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
433 Moniker: sm.config.Moniker,
434 Network: sm.config.ChainID,
435 Version: version.Version,
436 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
439 if !sm.sw.IsListening() {
443 p2pListener := sm.sw.Listeners()[0]
445 // We assume that the rpcListener has the same ExternalAddress.
446 // This is probably true because both P2P and RPC listeners use UPnP,
447 // except of course if the rpc is only bound to localhost
449 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
451 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
456 //Start start sync manager service
457 func (sm *SyncManager) Start() {
458 if _, err := sm.sw.Start(); err != nil {
459 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
461 // broadcast transactions
462 go sm.txBroadcastLoop()
463 go sm.minedBroadcastLoop()
467 //Stop stop sync manager
468 func (sm *SyncManager) Stop() {
473 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
474 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
479 conn, err := net.ListenUDP("udp", addr)
484 realaddr := conn.LocalAddr().(*net.UDPAddr)
485 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
490 // add the seeds node to the discover table
491 if config.P2P.Seeds == "" {
494 nodes := []*discover.Node{}
495 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
496 version.Status.AddSeed(seed)
497 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
498 nodes = append(nodes, discover.MustParseNode(url))
500 if err = ntab.SetFallbackNodes(nodes); err != nil {
506 func (sm *SyncManager) minedBroadcastLoop() {
509 case blockHash := <-sm.newBlockCh:
510 block, err := sm.chain.GetBlockByHash(blockHash)
512 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
515 if err := sm.peers.broadcastMinedBlock(block); err != nil {
516 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")