cmn.BaseService
Config *cfg.Config
- peerConfig *PeerConfig
- listeners []Listener
- reactors map[string]Reactor
- chDescs []*connection.ChannelDescriptor
- reactorsByCh map[byte]Reactor
- peers *PeerSet
- dialing *cmn.CMap
- nodeInfo *NodeInfo // our node info
- nodePrivKey signlib.PrivKey // our node privkey
- discv discv
- lanDiscv lanDiscv
- security Security
+ PeerConfig *PeerConfig
+ Listeners []Listener
+ Reactors map[string]Reactor
+ ChDescs []*connection.ChannelDescriptor
+ ReactorsByCh map[byte]Reactor
+ Peers *PeerSet
+ Dialing *cmn.CMap
+ NodeInfo *NodeInfo // our node info
+ NodePrivKey signlib.PrivKey // our node privkey
+ Discv discv
+ LanDiscv lanDiscv
+ Security Security
}
// NewSwitch create a new Switch and set discover.
func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
sw := &Switch{
Config: config,
- peerConfig: DefaultPeerConfig(config.P2P),
- reactors: make(map[string]Reactor),
- chDescs: make([]*connection.ChannelDescriptor, 0),
- reactorsByCh: make(map[byte]Reactor),
- peers: NewPeerSet(),
- dialing: cmn.NewCMap(),
- nodePrivKey: privKey,
- discv: discv,
- lanDiscv: lanDiscv,
- nodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
- security: security.NewSecurity(config),
+ PeerConfig: DefaultPeerConfig(config.P2P),
+ Reactors: make(map[string]Reactor),
+ ChDescs: make([]*connection.ChannelDescriptor, 0),
+ ReactorsByCh: make(map[byte]Reactor),
+ Peers: NewPeerSet(),
+ Dialing: cmn.NewCMap(),
+ NodePrivKey: privKey,
+ Discv: discv,
+ LanDiscv: lanDiscv,
+ NodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
+ Security: security.NewSecurity(config),
}
sw.AddListener(l)
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
- log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
+ log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.NodeInfo}).Info("init p2p network")
return sw, nil
}
// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
- for _, reactor := range sw.reactors {
+ for _, reactor := range sw.Reactors {
if _, err := reactor.Start(); err != nil {
return err
}
}
- sw.security.RegisterFilter(sw.nodeInfo)
- sw.security.RegisterFilter(sw.peers)
- if err := sw.security.Start(); err != nil {
+ sw.Security.RegisterFilter(sw.NodeInfo)
+ sw.Security.RegisterFilter(sw.Peers)
+ if err := sw.Security.Start(); err != nil {
return err
}
- for _, listener := range sw.listeners {
+ for _, listener := range sw.Listeners {
go sw.listenerRoutine(listener)
}
go sw.ensureOutboundPeersRoutine()
// OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() {
if sw.Config.P2P.LANDiscover {
- sw.lanDiscv.Stop()
+ sw.LanDiscv.Stop()
}
- for _, listener := range sw.listeners {
+ for _, listener := range sw.Listeners {
listener.Stop()
}
- sw.listeners = nil
+ sw.Listeners = nil
- for _, peer := range sw.peers.List() {
+ for _, peer := range sw.Peers.List() {
peer.Stop()
- sw.peers.Remove(peer)
+ sw.Peers.Remove(peer)
}
- for _, reactor := range sw.reactors {
+ for _, reactor := range sw.Reactors {
reactor.Stop()
}
}
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
- peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
+ peerNodeInfo, err := pc.HandshakeTimeout(sw.NodeInfo, sw.PeerConfig.HandshakeTimeout)
if err != nil {
return err
}
- if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
+ if err := version.Status.CheckUpdate(sw.NodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
return err
}
- if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
+ if err := sw.NodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
return err
}
- peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
- if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil {
+ peer := newPeer(pc, peerNodeInfo, sw.ReactorsByCh, sw.ChDescs, sw.StopPeerForError, isLAN)
+ if err := sw.Security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil {
return err
}
}
}
- return sw.peers.Add(peer)
+ return sw.Peers.Add(peer)
}
// AddReactor adds the given reactor to the switch.
// No two reactors can share the same channel.
for _, chDesc := range reactor.GetChannels() {
chID := chDesc.ID
- if sw.reactorsByCh[chID] != nil {
- cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
+ if sw.ReactorsByCh[chID] != nil {
+ cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.ReactorsByCh[chID], reactor))
}
- sw.chDescs = append(sw.chDescs, chDesc)
- sw.reactorsByCh[chID] = reactor
+ sw.ChDescs = append(sw.ChDescs, chDesc)
+ sw.ReactorsByCh[chID] = reactor
}
- sw.reactors[name] = reactor
+ sw.Reactors[name] = reactor
reactor.SetSwitch(sw)
return reactor
}
// AddListener adds the given listener to the switch for listening to incoming peer connections.
// NOTE: Not goroutine safe.
func (sw *Switch) AddListener(l Listener) {
- sw.listeners = append(sw.listeners, l)
+ sw.Listeners = append(sw.Listeners, l)
}
//DialPeerWithAddress dial node from net address
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
- sw.dialing.Set(addr.IP.String(), addr)
- defer sw.dialing.Delete(addr.IP.String())
+ sw.Dialing.Set(addr.IP.String(), addr)
+ defer sw.Dialing.Delete(addr.IP.String())
if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
return err
}
lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
sw := &p2p.Switch{
- Config: config,
- peerConfig: p2p.DefaultPeerConfig(m.nodeCfg.P2P),
- reactors: make(map[string]p2p.Reactor),
- chDescs: make([]*conn.ChannelDescriptor, 0),
- reactorsByCh: make(map[byte]Reactor),
- peers: p2p.NewPeerSet(),
- dialing: cmn.NewCMap(),
- nodePrivKey: swPrivKey,
- discv: discv,
- lanDiscv: lanDiscv,
- nodeInfo: p2p.NewNodeInfo(m.nodeCfg, swPrivKey.XPub(), listenAddr, netID),
- security: security.NewSecurity(m.nodeCfg),
+ Config: m.nodeCfg,
+ PeerConfig: p2p.DefaultPeerConfig(m.nodeCfg.P2P),
+ Reactors: make(map[string]p2p.Reactor),
+ ChDescs: make([]*conn.ChannelDescriptor, 0),
+ ReactorsByCh: make(map[byte]Reactor),
+ Peers: p2p.NewPeerSet(),
+ Dialing: cmn.NewCMap(),
+ NodePrivKey: swPrivKey,
+ Discv: discv,
+ LanDiscv: lanDiscv,
+ NodeInfo: p2p.NewNodeInfo(m.nodeCfg, swPrivKey.XPub(), listenAddr, netID),
+ Security: security.NewSecurity(m.nodeCfg),
}
sw.AddListener(l)
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)