X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=p2p%2Fswitch.go;h=d768e64953e70eb849ffca229f88e5e6d6ff4044;hb=280ddc92cc64d8763f27930e120f947e972c6073;hp=5d84b30a856d8501634fae02be4a157509e8f546;hpb=727d644e29744414cb67bfaa5e01b0813c623135;p=bytom%2Fbytom.git diff --git a/p2p/switch.go b/p2p/switch.go index 5d84b30a..d768e649 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,7 +2,6 @@ package p2p import ( "encoding/hex" - "encoding/json" "fmt" "net" "sync" @@ -15,21 +14,18 @@ import ( cfg "github.com/bytom/config" "github.com/bytom/consensus" "github.com/bytom/crypto/ed25519" - dbm "github.com/bytom/database/leveldb" "github.com/bytom/errors" "github.com/bytom/event" "github.com/bytom/p2p/connection" "github.com/bytom/p2p/discover/dht" "github.com/bytom/p2p/discover/mdns" "github.com/bytom/p2p/netutil" - "github.com/bytom/p2p/trust" + "github.com/bytom/p2p/security" "github.com/bytom/version" ) const ( - bannedPeerKey = "BannedPeer" - defaultBanDuration = time.Hour * 1 - logModule = "p2p" + logModule = "p2p" minNumOutboundPeers = 4 maxNumLANPeers = 5 @@ -37,10 +33,9 @@ const ( //pre-define errors for connecting fail var ( - ErrDuplicatePeer = errors.New("Duplicate peer") - ErrConnectSelf = errors.New("Connect self") - ErrConnectBannedPeer = errors.New("Connect banned peer") - ErrConnectSpvPeer = errors.New("Outbound connect spv peer") + ErrDuplicatePeer = errors.New("Duplicate peer") + ErrConnectSelf = errors.New("Connect self") + ErrConnectSpvPeer = errors.New("Outbound connect spv peer") ) type discv interface { @@ -52,6 +47,13 @@ type lanDiscv interface { Stop() } +type Security interface { + DoFilter(ip string, pubKey string) error + IsBanned(ip string, level byte, reason string) bool + RegisterFilter(filter security.Filter) + Start() error +} + // Switch handles peer connections and exposes an API to receive incoming messages // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one // or more `Channels`. So while sending outgoing messages is typically performed on the peer, @@ -71,9 +73,7 @@ type Switch struct { nodePrivKey crypto.PrivKeyEd25519 // our node privkey discv discv lanDiscv lanDiscv - bannedPeer map[string]time.Time - db dbm.DB - mtx sync.Mutex + security Security } // NewSwitch create a new Switch and set discover. @@ -84,7 +84,6 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { var discv *dht.Network var lanDiscv *mdns.LANDiscover - blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) config.P2P.PrivateKey, err = config.NodeKey() if err != nil { return nil, err @@ -106,15 +105,15 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { return nil, err } if config.P2P.LANDiscover { - lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port)) + lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port)) } } - return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr) + return newSwitch(config, discv, lanDiscv, l, privKey, listenAddr) } // newSwitch creates a new Switch with the given config. -func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) { +func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) { sw := &Switch{ Config: config, peerConfig: DefaultPeerConfig(config.P2P), @@ -126,17 +125,12 @@ func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB d nodePrivKey: priv, discv: discv, lanDiscv: lanDiscv, - db: blacklistDB, nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr), - bannedPeer: make(map[string]time.Time), - } - if err := sw.loadBannedPeers(); err != nil { - return nil, err + security: security.NewSecurity(config), } sw.AddListener(l) sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) - trust.Init() return sw, nil } @@ -147,6 +141,13 @@ func (sw *Switch) OnStart() error { return err } } + + sw.security.RegisterFilter(sw.nodeInfo) + sw.security.RegisterFilter(sw.peers) + if err := sw.security.Start(); err != nil { + return err + } + for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } @@ -177,21 +178,6 @@ func (sw *Switch) OnStop() { } } -//AddBannedPeer add peer to blacklist -func (sw *Switch) AddBannedPeer(ip string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration) - dataJSON, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), dataJSON) - return nil -} - // AddPeer performs the P2P handshake with a peer // that already has a SecretConnection. If all goes well, // it starts the peer and adds it to the switch. @@ -211,7 +197,7 @@ func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error { } peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN) - if err := sw.filterConnByPeer(peer); err != nil { + if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey().String()); err != nil { return err } @@ -258,7 +244,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer") sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) - if err := sw.filterConnByIP(addr.IP.String()); err != nil { + if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil { return err } @@ -277,6 +263,10 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { return nil } +func (sw *Switch) IsBanned(ip string, level byte, reason string) bool { + return sw.security.IsBanned(ip, level, reason) +} + //IsDialing prevent duplicate dialing func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.IP.String()) @@ -288,17 +278,6 @@ func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// loadBannedPeers load banned peers from db -func (sw *Switch) loadBannedPeers() error { - if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil { - if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil { - return err - } - } - - return nil -} - // Listeners returns the list of listeners the switch listens on. // NOTE: Not goroutine safe. func (sw *Switch) Listeners() []Listener { @@ -366,22 +345,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return nil } -func (sw *Switch) checkBannedPeer(peer string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - if banEnd, ok := sw.bannedPeer[peer]; ok { - if time.Now().Before(banEnd) { - return ErrConnectBannedPeer - } - - if err := sw.delBannedPeer(peer); err != nil { - return err - } - } - return nil -} - func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) { lanPeers, _, _, numDialing := sw.NumPeers() numToDial := maxNumLANPeers - lanPeers @@ -426,42 +389,6 @@ func (sw *Switch) connectLANPeersRoutine() { } } -func (sw *Switch) delBannedPeer(addr string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - delete(sw.bannedPeer, addr) - datajson, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), datajson) - return nil -} - -func (sw *Switch) filterConnByIP(ip string) error { - if ip == sw.nodeInfo.listenHost() { - return ErrConnectSelf - } - return sw.checkBannedPeer(ip) -} - -func (sw *Switch) filterConnByPeer(peer *Peer) error { - if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil { - return err - } - - if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) { - return ErrConnectSelf - } - - if sw.peers.Has(peer.Key) { - return ErrDuplicatePeer - } - return nil -} - func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() @@ -496,7 +423,7 @@ func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) { func (sw *Switch) dialPeers(addresses []*NetAddress) { connectedPeers := make(map[string]struct{}) for _, peer := range sw.Peers().List() { - connectedPeers[peer.remoteAddrHost()] = struct{}{} + connectedPeers[peer.RemoteAddrHost()] = struct{}{} } var wg sync.WaitGroup