X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=p2p%2Fswitch.go;h=c9e225dce27cbdb6d8a3d01f43020aa882c21622;hb=ee239f82f80ac867cc504bea6638f60331a2c69f;hp=ef8306d331af28474cc0552a73fa61652cc4b055;hpb=807d99726f6a0610fa9c835e2aabd983801d3510;p=bytom%2Fvapor.git diff --git a/p2p/switch.go b/p2p/switch.go index ef8306d3..c9e225dc 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -19,15 +19,15 @@ import ( "github.com/vapor/p2p/discover/dht" "github.com/vapor/p2p/discover/mdns" "github.com/vapor/p2p/netutil" - "github.com/vapor/p2p/signlib" security "github.com/vapor/p2p/security" + "github.com/vapor/p2p/signlib" "github.com/vapor/version" ) const ( logModule = "p2p" - minNumOutboundPeers = 4 + minNumOutboundPeers = 3 maxNumLANPeers = 5 //magicNumber used to generate unique netID magicNumber = uint64(0x054c5638) @@ -78,8 +78,8 @@ type Switch struct { security Security } -// NewSwitch create a new Switch and set discover. -func NewSwitch(config *cfg.Config) (*Switch, error) { +// NewSwitchMaybeDiscover create a new Switch and set discover. +func NewSwitchMaybeDiscover(config *cfg.Config) (*Switch, error) { var err error var l Listener var listenAddr string @@ -105,15 +105,15 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { return nil, err } if config.P2P.LANDiscover { - lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port)) + lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port)) } } - return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID) + return NewSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID) } // newSwitch creates a new Switch with the given config. -func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) { +func NewSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) { sw := &Switch{ Config: config, peerConfig: DefaultPeerConfig(config.P2P), @@ -135,6 +135,26 @@ func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, p return sw, nil } +func (sw *Switch) GetDiscv() discv { + return sw.discv +} + +func (sw *Switch) GetNodeInfo() *NodeInfo { + return sw.nodeInfo +} + +func (sw *Switch) GetPeers() *PeerSet { + return sw.peers +} + +func (sw *Switch) GetReactors() map[string]Reactor { + return sw.reactors +} + +func (sw *Switch) GetSecurity() Security { + return sw.security +} + // OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { for _, reactor := range sw.reactors { @@ -199,7 +219,7 @@ func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error { } peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN) - if err := sw.security.DoFilter(peer.remoteAddrHost(), peer.PubKey()); err != nil { + if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil { return err } @@ -252,7 +272,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig) if err != nil { - log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn") + log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn") return err } @@ -352,7 +372,7 @@ func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) { for i := 0; i < len(lanPeer.IP); i++ { addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port))) } - sw.dialPeers(addresses) + sw.DialPeers(addresses) } func (sw *Switch) connectLANPeersRoutine() { @@ -392,7 +412,7 @@ func (sw *Switch) listenerRoutine(l Listener) { break } - // disconnect if we alrady have MaxNumPeers + // disconnect if we already have MaxNumPeers if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers { if err := inConn.Close(); err != nil { log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err") @@ -411,15 +431,15 @@ func (sw *Switch) listenerRoutine(l Listener) { func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) { if err := sw.DialPeerWithAddress(a); err != nil { - log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer") + log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer") } wg.Done() } -func (sw *Switch) dialPeers(addresses []*NetAddress) { +func (sw *Switch) DialPeers(addresses []*NetAddress) { connectedPeers := make(map[string]struct{}) for _, peer := range sw.Peers().List() { - connectedPeers[peer.remoteAddrHost()] = struct{}{} + connectedPeers[peer.RemoteAddrHost()] = struct{}{} } var wg sync.WaitGroup @@ -452,7 +472,7 @@ func (sw *Switch) ensureKeepConnectPeers() { addresses = append(addresses, address) } - sw.dialPeers(addresses) + sw.DialPeers(addresses) } func (sw *Switch) ensureOutboundPeers() { @@ -470,7 +490,7 @@ func (sw *Switch) ensureOutboundPeers() { address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP) addresses = append(addresses, address) } - sw.dialPeers(addresses) + sw.DialPeers(addresses) } func (sw *Switch) ensureOutboundPeersRoutine() {