X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=p2p%2Fswitch.go;h=ef8306d331af28474cc0552a73fa61652cc4b055;hp=62d47f1c752f53da65998161759d6b8155777471;hb=807d99726f6a0610fa9c835e2aabd983801d3510;hpb=aa87732ffe2e2418d8f1ae37da388ecf73e59621 diff --git a/p2p/switch.go b/p2p/switch.go index 62d47f1c..ef8306d3 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,7 +2,6 @@ package p2p import ( "encoding/binary" - "encoding/json" "fmt" "net" "sync" @@ -14,7 +13,6 @@ import ( cfg "github.com/vapor/config" "github.com/vapor/consensus" "github.com/vapor/crypto/sha3pool" - dbm "github.com/vapor/database/leveldb" "github.com/vapor/errors" "github.com/vapor/event" "github.com/vapor/p2p/connection" @@ -22,14 +20,12 @@ import ( "github.com/vapor/p2p/discover/mdns" "github.com/vapor/p2p/netutil" "github.com/vapor/p2p/signlib" - "github.com/vapor/p2p/trust" + security "github.com/vapor/p2p/security" "github.com/vapor/version" ) const ( - bannedPeerKey = "BannedPeer" - defaultBanDuration = time.Hour * 1 - logModule = "p2p" + logModule = "p2p" minNumOutboundPeers = 4 maxNumLANPeers = 5 @@ -39,10 +35,9 @@ const ( //pre-define errors for connecting fail var ( - ErrDuplicatePeer = errors.New("Duplicate peer") - ErrConnectSelf = errors.New("Connect self") - ErrConnectBannedPeer = errors.New("Connect banned peer") - ErrConnectSpvPeer = errors.New("Outbound connect spv peer") + ErrDuplicatePeer = errors.New("Duplicate peer") + ErrConnectSelf = errors.New("Connect self") + ErrConnectSpvPeer = errors.New("Outbound connect spv peer") ) type discv interface { @@ -54,6 +49,13 @@ type lanDiscv interface { Stop() } +type Security interface { + DoFilter(ip string, pubKey string) error + IsBanned(ip string, level byte, reason string) bool + RegisterFilter(filter security.Filter) + Start() error +} + // Switch handles peer connections and exposes an API to receive incoming messages // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one // or more `Channels`. So while sending outgoing messages is typically performed on the peer, @@ -73,9 +75,7 @@ type Switch struct { nodePrivKey signlib.PrivKey // our node privkey discv discv lanDiscv lanDiscv - bannedPeer map[string]time.Time - db dbm.DB - mtx sync.Mutex + security Security } // NewSwitch create a new Switch and set discover. @@ -96,7 +96,6 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { sha3pool.Sum256(h[:], data) netID := binary.BigEndian.Uint64(h[:8]) - blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) privateKey := config.PrivateKey() if !config.VaultMode { // Create listener @@ -110,11 +109,11 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { } } - return newSwitch(config, discv, lanDiscv, blacklistDB, l, *privateKey, listenAddr, netID) + return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID) } // newSwitch creates a new Switch with the given config. -func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) { +func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) { sw := &Switch{ Config: config, peerConfig: DefaultPeerConfig(config.P2P), @@ -126,17 +125,12 @@ func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB d nodePrivKey: privKey, discv: discv, lanDiscv: lanDiscv, - db: blacklistDB, nodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID), - bannedPeer: make(map[string]time.Time), - } - if err := sw.loadBannedPeers(); err != nil { - return nil, err + security: security.NewSecurity(config), } sw.AddListener(l) sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) - trust.Init() log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network") return sw, nil } @@ -148,6 +142,13 @@ func (sw *Switch) OnStart() error { return err } } + + sw.security.RegisterFilter(sw.nodeInfo) + sw.security.RegisterFilter(sw.peers) + if err := sw.security.Start(); err != nil { + return err + } + for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } @@ -178,21 +179,6 @@ func (sw *Switch) OnStop() { } } -//AddBannedPeer add peer to blacklist -func (sw *Switch) AddBannedPeer(ip string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration) - dataJSON, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), dataJSON) - return nil -} - // AddPeer performs the P2P handshake with a peer // that already has a SecretConnection. If all goes well, // it starts the peer and adds it to the switch. @@ -213,7 +199,7 @@ func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error { } peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN) - if err := sw.filterConnByPeer(peer); err != nil { + if err := sw.security.DoFilter(peer.remoteAddrHost(), peer.PubKey()); err != nil { return err } @@ -260,7 +246,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer") sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) - if err := sw.filterConnByIP(addr.IP.String()); err != nil { + if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil { return err } @@ -279,6 +265,10 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { return nil } +func (sw *Switch) IsBanned(ip string, level byte, reason string) bool { + return sw.security.IsBanned(ip, level, reason) +} + //IsDialing prevent duplicate dialing func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.IP.String()) @@ -290,17 +280,6 @@ func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// loadBannedPeers load banned peers from db -func (sw *Switch) loadBannedPeers() error { - if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil { - if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil { - return err - } - } - - return nil -} - // Listeners returns the list of listeners the switch listens on. // NOTE: Not goroutine safe. func (sw *Switch) Listeners() []Listener { @@ -362,22 +341,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return nil } -func (sw *Switch) checkBannedPeer(peer string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - if banEnd, ok := sw.bannedPeer[peer]; ok { - if time.Now().Before(banEnd) { - return ErrConnectBannedPeer - } - - if err := sw.delBannedPeer(peer); err != nil { - return err - } - } - return nil -} - func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) { lanPeers, _, _, numDialing := sw.NumPeers() numToDial := maxNumLANPeers - lanPeers @@ -422,42 +385,6 @@ func (sw *Switch) connectLANPeersRoutine() { } } -func (sw *Switch) delBannedPeer(addr string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - delete(sw.bannedPeer, addr) - datajson, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), datajson) - return nil -} - -func (sw *Switch) filterConnByIP(ip string) error { - if ip == sw.nodeInfo.listenHost() { - return ErrConnectSelf - } - return sw.checkBannedPeer(ip) -} - -func (sw *Switch) filterConnByPeer(peer *Peer) error { - if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil { - return err - } - - if sw.nodeInfo.PubKey == peer.PubKey() { - return ErrConnectSelf - } - - if sw.peers.Has(peer.Key) { - return ErrDuplicatePeer - } - return nil -} - func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections()