X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fsync_manager.go;h=95ffbb489a5243c661d82e934e887f08543a50e3;hp=18d5291e972006149ccc248029293c0f4f875aed;hb=31f8f7cf1ffbec5365ab6ebf217537809cf714e5;hpb=7e01ede3ce5d3688fa29f30bc766593beb9508e4 diff --git a/netsync/sync_manager.go b/netsync/sync_manager.go index 18d5291e..95ffbb48 100644 --- a/netsync/sync_manager.go +++ b/netsync/sync_manager.go @@ -3,15 +3,17 @@ package netsync import ( "errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" - cfg "github.com/vapor/config" + "github.com/vapor/config" "github.com/vapor/consensus" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/event" "github.com/vapor/netsync/chainmgr" + "github.com/vapor/netsync/consensusmgr" "github.com/vapor/netsync/peers" "github.com/vapor/p2p" - core "github.com/vapor/protocol" + "github.com/vapor/protocol" ) const ( @@ -22,12 +24,20 @@ var ( errVaultModeDialPeer = errors.New("can't dial peer in vault mode") ) +// ChainMgr is the interface for p2p chain message sync manager. type ChainMgr interface { Start() error IsCaughtUp() bool Stop() } +// ConsensusMgr is the interface for consensus message sync manager. +type ConsensusMgr interface { + Start() error + Stop() +} + +// Switch is the interface for p2p switch. type Switch interface { Start() (bool, error) Stop() bool @@ -38,50 +48,60 @@ type Switch interface { //SyncManager Sync Manager is responsible for the business layer information synchronization type SyncManager struct { - config *cfg.Config - sw Switch - chainMgr ChainMgr - peers *peers.PeerSet + config *config.Config + sw Switch + chainMgr ChainMgr + consensusMgr ConsensusMgr + peers *peers.PeerSet } // NewSyncManager create sync manager and set switch. -func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { +func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher, fastSyncDB dbm.DB) (*SyncManager, error) { sw, err := p2p.NewSwitch(config) if err != nil { return nil, err } peers := peers.NewPeerSet(sw) - chainManger, err := chainmgr.NewChainManager(config, sw, chain, txPool, dispatcher, peers) + chainManger, err := chainmgr.NewManager(config, sw, chain, txPool, dispatcher, peers, fastSyncDB) if err != nil { return nil, err } - + consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers) return &SyncManager{ - config: config, - sw: sw, - chainMgr: chainManger, - peers: peers, + config: config, + sw: sw, + chainMgr: chainManger, + consensusMgr: consensusMgr, + peers: peers, }, nil } +// Start message sync manager service. func (sm *SyncManager) Start() error { if _, err := sm.sw.Start(); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed start switch") + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed start switch") + return err + } + + if err := sm.chainMgr.Start(); err != nil { return err } - return sm.chainMgr.Start() + return sm.consensusMgr.Start() } +// Stop message sync manager service. func (sm *SyncManager) Stop() { sm.chainMgr.Stop() + sm.consensusMgr.Stop() if !sm.config.VaultMode { sm.sw.Stop() } } +// IsListening check if the vapord service port is open? func (sm *SyncManager) IsListening() bool { if sm.config.VaultMode { return false @@ -95,6 +115,7 @@ func (sm *SyncManager) IsCaughtUp() bool { return sm.chainMgr.IsCaughtUp() } +// PeerCount count the number of connected peers. func (sm *SyncManager) PeerCount() int { if sm.config.VaultMode { return 0 @@ -102,10 +123,12 @@ func (sm *SyncManager) PeerCount() int { return len(sm.sw.Peers().List()) } +// GetNetwork get the type of network. func (sm *SyncManager) GetNetwork() string { return sm.config.ChainID } +// BestPeer fine the peer with the highest height from the connected peers. func (sm *SyncManager) BestPeer() *peers.PeerInfo { bestPeer := sm.peers.BestPeer(consensus.SFFullNode) if bestPeer != nil { @@ -114,6 +137,7 @@ func (sm *SyncManager) BestPeer() *peers.PeerInfo { return nil } +// DialPeerWithAddress dial the peer and establish a connection. func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error { if sm.config.VaultMode { return errVaultModeDialPeer @@ -122,7 +146,7 @@ func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error { return sm.sw.DialPeerWithAddress(addr) } -//GetPeerInfos return peer info of all peers +//GetPeerInfos return peer info of all connected peers. func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo { return sm.peers.GetPeerInfos() }