12 log "github.com/sirupsen/logrus"
13 "github.com/tendermint/go-crypto"
14 cmn "github.com/tendermint/tmlibs/common"
16 "github.com/vapor/chain"
17 cfg "github.com/vapor/config"
18 "github.com/vapor/consensus"
19 "github.com/vapor/p2p"
20 "github.com/vapor/p2p/discover"
21 core "github.com/vapor/protocol"
22 "github.com/vapor/protocol/bc"
23 "github.com/vapor/protocol/bc/types"
24 "github.com/vapor/version"
30 maxFilterAddressSize = 50
31 maxFilterAddressCount = 1000
34 //SyncManager Sync Manager is responsible for the business layer information synchronization
35 type SyncManager struct {
39 privKey crypto.PrivKeyEd25519 // local node's p2p key
42 blockFetcher *blockFetcher
43 blockKeeper *blockKeeper
46 newTxCh chan *types.Tx
47 newBlockCh chan *bc.Hash
48 txSyncCh chan *txSyncMsg
49 quitSync chan struct{}
53 //NewSyncManager create a sync manager
54 func NewSyncManager(config *cfg.Config, chain chain.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
55 genesisHeader, err := chain.GetHeaderByHeight(0)
60 sw := p2p.NewSwitch(config)
61 peers := newPeerSet(sw)
62 manager := &SyncManager{
64 genesisHash: genesisHeader.Hash(),
67 privKey: crypto.GenPrivKeyEd25519(),
68 blockFetcher: newBlockFetcher(chain, peers),
69 blockKeeper: newBlockKeeper(chain, peers),
71 newTxCh: make(chan *types.Tx, maxTxChanSize),
72 newBlockCh: newBlockCh,
73 txSyncCh: make(chan *txSyncMsg),
74 quitSync: make(chan struct{}),
78 protocolReactor := NewProtocolReactor(manager, manager.peers)
79 manager.sw.AddReactor("PROTOCOL", protocolReactor)
81 // Create & add listener
82 var listenerStatus bool
84 if !config.VaultMode {
85 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
86 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
87 manager.sw.AddListener(l)
89 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
93 manager.sw.SetDiscv(discv)
95 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
96 manager.sw.SetNodePrivKey(manager.privKey)
100 //BestPeer return the highest p2p peerInfo
101 func (sm *SyncManager) BestPeer() *PeerInfo {
102 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
104 return bestPeer.getPeerInfo()
109 // GetNewTxCh return a unconfirmed transaction feed channel
110 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
114 //GetPeerInfos return peer info of all peers
115 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
116 return sm.peers.getPeerInfos()
119 //IsCaughtUp check wheather the peer finish the sync
120 func (sm *SyncManager) IsCaughtUp() bool {
121 peer := sm.peers.bestPeer(consensus.SFFullNode)
122 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
125 //NodeInfo get P2P peer node info
126 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
127 return sm.sw.NodeInfo()
130 //StopPeer try to stop peer by given ID
131 func (sm *SyncManager) StopPeer(peerID string) error {
132 if peer := sm.peers.getPeer(peerID); peer == nil {
133 return errors.New("peerId not exist")
135 sm.peers.removePeer(peerID)
139 //Switch get sync manager switch
140 func (sm *SyncManager) Switch() *p2p.Switch {
144 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
145 block, err := msg.GetBlock()
149 sm.blockKeeper.processBlock(peer.ID(), block)
152 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
153 blocks, err := msg.GetBlocks()
155 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
159 sm.blockKeeper.processBlocks(peer.ID(), blocks)
162 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
163 peer.addFilterAddress(msg.Address)
166 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
167 peer.filterAdds.Clear()
170 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
171 peer.addFilterAddresses(msg.Addresses)
174 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
175 var block *types.Block
178 block, err = sm.chain.GetBlockByHeight(msg.Height)
180 block, err = sm.chain.GetBlockByHash(msg.GetHash())
183 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
187 ok, err := peer.sendBlock(block)
189 sm.peers.removePeer(peer.ID())
192 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
196 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
197 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
198 if err != nil || len(blocks) == 0 {
203 sendBlocks := []*types.Block{}
204 for _, block := range blocks {
205 rawData, err := block.MarshalText()
207 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
211 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
214 totalSize += len(rawData)
215 sendBlocks = append(sendBlocks, block)
218 ok, err := peer.sendBlocks(sendBlocks)
220 sm.peers.removePeer(peer.ID())
223 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
227 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
228 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
229 if err != nil || len(headers) == 0 {
230 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
234 ok, err := peer.sendHeaders(headers)
236 sm.peers.removePeer(peer.ID())
239 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
243 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
245 var block *types.Block
247 block, err = sm.chain.GetBlockByHeight(msg.Height)
249 block, err = sm.chain.GetBlockByHash(msg.GetHash())
252 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
256 blockHash := block.Hash()
257 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
259 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
263 ok, err := peer.sendMerkleBlock(block, txStatus)
265 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
270 sm.peers.removePeer(peer.ID())
274 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
275 headers, err := msg.GetHeaders()
277 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
281 sm.blockKeeper.processHeaders(peer.ID(), headers)
284 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
285 block, err := msg.GetMineBlock()
287 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
292 peer.markBlock(&hash)
293 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
294 peer.setStatus(block.Height, &hash)
297 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
298 bestHeader := sm.chain.BestBlockHeader()
299 genesisBlock, err := sm.chain.GetBlockByHeight(0)
301 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
304 genesisHash := genesisBlock.Hash()
305 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
306 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
307 sm.peers.removePeer(peer.ID())
311 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
312 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
313 peer.setStatus(msg.Height, msg.GetHash())
317 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
318 log.WithFields(log.Fields{
320 "remote genesis": genesisHash.String(),
321 "local genesis": sm.genesisHash.String(),
322 }).Warn("fail hand shake due to differnt genesis")
326 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
329 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
330 tx, err := msg.GetTransaction()
332 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
336 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
337 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
341 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
342 peer := sm.peers.getPeer(basePeer.ID())
343 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
347 log.WithFields(log.Fields{
349 "peer": basePeer.Addr(),
350 "type": reflect.TypeOf(msg),
351 "message": msg.String(),
352 }).Info("receive message from peer")
354 switch msg := msg.(type) {
355 case *GetBlockMessage:
356 sm.handleGetBlockMsg(peer, msg)
359 sm.handleBlockMsg(peer, msg)
361 case *StatusRequestMessage:
362 sm.handleStatusRequestMsg(basePeer)
364 case *StatusResponseMessage:
365 sm.handleStatusResponseMsg(basePeer, msg)
367 case *TransactionMessage:
368 sm.handleTransactionMsg(peer, msg)
370 case *MineBlockMessage:
371 sm.handleMineBlockMsg(peer, msg)
373 case *GetHeadersMessage:
374 sm.handleGetHeadersMsg(peer, msg)
376 case *HeadersMessage:
377 sm.handleHeadersMsg(peer, msg)
379 case *GetBlocksMessage:
380 sm.handleGetBlocksMsg(peer, msg)
383 sm.handleBlocksMsg(peer, msg)
385 case *FilterLoadMessage:
386 sm.handleFilterLoadMsg(peer, msg)
388 case *FilterAddMessage:
389 sm.handleFilterAddMsg(peer, msg)
391 case *FilterClearMessage:
392 sm.handleFilterClearMsg(peer)
394 case *GetMerkleBlockMessage:
395 sm.handleGetMerkleBlockMsg(peer, msg)
398 log.WithFields(log.Fields{
400 "peer": basePeer.Addr(),
401 "message_type": reflect.TypeOf(msg),
402 }).Error("unhandled message type")
407 func protocolAndAddress(listenAddr string) (string, string) {
408 p, address := "tcp", listenAddr
409 parts := strings.SplitN(address, "://", 2)
411 p, address = parts[0], parts[1]
416 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
417 nodeInfo := &p2p.NodeInfo{
418 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
419 Moniker: sm.config.Moniker,
420 Network: sm.config.ChainID,
421 Version: version.Version,
422 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
425 if !sm.sw.IsListening() {
429 p2pListener := sm.sw.Listeners()[0]
431 // We assume that the rpcListener has the same ExternalAddress.
432 // This is probably true because both P2P and RPC listeners use UPnP,
433 // except of course if the rpc is only bound to localhost
435 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
437 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
442 //Start start sync manager service
443 func (sm *SyncManager) Start() {
444 if _, err := sm.sw.Start(); err != nil {
445 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
447 // broadcast transactions
448 go sm.txBroadcastLoop()
449 go sm.minedBroadcastLoop()
453 //Stop stop sync manager
454 func (sm *SyncManager) Stop() {
459 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
460 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
465 conn, err := net.ListenUDP("udp", addr)
470 realaddr := conn.LocalAddr().(*net.UDPAddr)
471 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
476 // add the seeds node to the discover table
477 if config.P2P.Seeds == "" {
480 nodes := []*discover.Node{}
481 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
482 version.Status.AddSeed(seed)
483 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
484 nodes = append(nodes, discover.MustParseNode(url))
486 if err = ntab.SetFallbackNodes(nodes); err != nil {
492 func (sm *SyncManager) minedBroadcastLoop() {
495 case blockHash := <-sm.newBlockCh:
496 block, err := sm.chain.GetBlockByHash(blockHash)
498 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
501 if err := sm.peers.broadcastMinedBlock(block); err != nil {
502 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")