package netsync import ( "strings" log "github.com/sirupsen/logrus" "github.com/tendermint/go-crypto" "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" cfg "github.com/bytom/config" "github.com/bytom/p2p" core "github.com/bytom/protocol" "github.com/bytom/protocol/bc" "github.com/bytom/version" ) //SyncManager Sync Manager is responsible for the business layer information synchronization type SyncManager struct { networkID uint64 sw *p2p.Switch addrBook *p2p.AddrBook // known peers privKey crypto.PrivKeyEd25519 // local node's p2p key chain *core.Chain txPool *core.TxPool fetcher *Fetcher blockKeeper *blockKeeper peers *peerSet newBlockCh chan *bc.Hash newPeerCh chan struct{} txSyncCh chan *txsync dropPeerCh chan *string quitSync chan struct{} config *cfg.Config synchronising int32 } //NewSyncManager create a sync manager func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) { // Create the protocol manager with the base fields manager := &SyncManager{ txPool: txPool, chain: chain, privKey: crypto.GenPrivKeyEd25519(), config: config, quitSync: make(chan struct{}), newBlockCh: newBlockCh, newPeerCh: make(chan struct{}), txSyncCh: make(chan *txsync), dropPeerCh: make(chan *string, maxQuitReq), peers: newPeerSet(), } trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB) manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh) manager.fetcher = NewFetcher(chain, manager.sw, manager.peers) protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh) manager.sw.AddReactor("PROTOCOL", protocolReactor) // Optionally, start the pex reactor //var addrBook *p2p.AddrBook if config.P2P.PexReactor { manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) pexReactor := p2p.NewPEXReactor(manager.addrBook) manager.sw.AddReactor("PEX", pexReactor) } return manager, nil } // Defaults to tcp func protocolAndAddress(listenAddr string) (string, string) { p, address := "tcp", listenAddr parts := strings.SplitN(address, "://", 2) if len(parts) == 2 { p, address = parts[0], parts[1] } return p, address } func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo { nodeInfo := &p2p.NodeInfo{ PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519), Moniker: sm.config.Moniker, Network: sm.config.ChainID, Version: version.Version, Other: []string{ cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("p2p_version=%v", p2p.Version), }, } if !sm.sw.IsListening() { return nodeInfo } p2pListener := sm.sw.Listeners()[0] p2pHost := p2pListener.ExternalAddress().IP.String() p2pPort := p2pListener.ExternalAddress().Port // We assume that the rpcListener has the same ExternalAddress. // This is probably true because both P2P and RPC listeners use UPnP, // except of course if the rpc is only bound to localhost nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort) return nodeInfo } func (sm *SyncManager) netStart() error { // Create & add listener p, address := protocolAndAddress(sm.config.P2P.ListenAddress) l := p2p.NewDefaultListener(p, address, sm.config.P2P.SkipUPNP, nil) sm.sw.AddListener(l) // Start the switch sm.sw.SetNodeInfo(sm.makeNodeInfo()) sm.sw.SetNodePrivKey(sm.privKey) _, err := sm.sw.Start() if err != nil { return err } // If seeds exist, add them to the address book and dial out if sm.config.P2P.Seeds != "" { // dial out seeds := strings.Split(sm.config.P2P.Seeds, ",") if err := sm.DialSeeds(seeds); err != nil { return err } } return nil } //Start start sync manager service func (sm *SyncManager) Start() { go sm.netStart() // broadcast transactions go sm.txBroadcastLoop() // broadcast mined blocks go sm.minedBroadcastLoop() // start sync handlers go sm.syncer() go sm.txsyncLoop() } //Stop stop sync manager func (sm *SyncManager) Stop() { close(sm.quitSync) sm.sw.Stop() } func (sm *SyncManager) txBroadcastLoop() { newTxCh := sm.txPool.GetNewTxCh() for { select { case newTx := <-newTxCh: sm.peers.BroadcastTx(newTx) case <-sm.quitSync: return } } } func (sm *SyncManager) minedBroadcastLoop() { for { select { case blockHash := <-sm.newBlockCh: block, err := sm.chain.GetBlockByHash(blockHash) if err != nil { log.Errorf("Failed on mined broadcast loop get block %v", err) return } sm.peers.BroadcastMinedBlock(block) case <-sm.quitSync: return } } } //NodeInfo get P2P peer node info func (sm *SyncManager) NodeInfo() *p2p.NodeInfo { return sm.sw.NodeInfo() } //BlockKeeper get block keeper func (sm *SyncManager) BlockKeeper() *blockKeeper { return sm.blockKeeper } //Peers get sync manager peer set func (sm *SyncManager) Peers() *peerSet { return sm.peers } //DialSeeds dial seed peers func (sm *SyncManager) DialSeeds(seeds []string) error { return sm.sw.DialSeeds(sm.addrBook, seeds) } //Switch get sync manager switch func (sm *SyncManager) Switch() *p2p.Switch { return sm.sw }