X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=p2p%2Fswitch.go;h=d3daa7c649b838315b06f144312653e569650c6e;hp=cfae512077a8bf2b9d07d736a0a39e7244d75f68;hb=df9deca26ab5e99b48bae28fb8282ec3fa47d516;hpb=4ec54a2924dcfde48f4b6e5904d017f2ef4bd920 diff --git a/p2p/switch.go b/p2p/switch.go index cfae5120..d3daa7c6 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,38 +2,32 @@ package p2p import ( "encoding/binary" - "encoding/hex" - "encoding/json" "fmt" "net" "sync" "time" log "github.com/sirupsen/logrus" - crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" cfg "github.com/vapor/config" "github.com/vapor/consensus" - "github.com/vapor/crypto/ed25519" "github.com/vapor/crypto/sha3pool" - dbm "github.com/vapor/database/leveldb" "github.com/vapor/errors" "github.com/vapor/event" "github.com/vapor/p2p/connection" "github.com/vapor/p2p/discover/dht" "github.com/vapor/p2p/discover/mdns" "github.com/vapor/p2p/netutil" - "github.com/vapor/p2p/trust" + security "github.com/vapor/p2p/security" + "github.com/vapor/p2p/signlib" "github.com/vapor/version" ) const ( - bannedPeerKey = "BannedPeer" - defaultBanDuration = time.Hour * 1 - logModule = "p2p" + logModule = "p2p" - minNumOutboundPeers = 4 + minNumOutboundPeers = 3 maxNumLANPeers = 5 //magicNumber used to generate unique netID magicNumber = uint64(0x054c5638) @@ -41,10 +35,9 @@ const ( //pre-define errors for connecting fail var ( - ErrDuplicatePeer = errors.New("Duplicate peer") - ErrConnectSelf = errors.New("Connect self") - ErrConnectBannedPeer = errors.New("Connect banned peer") - ErrConnectSpvPeer = errors.New("Outbound connect spv peer") + ErrDuplicatePeer = errors.New("Duplicate peer") + ErrConnectSelf = errors.New("Connect self") + ErrConnectSpvPeer = errors.New("Outbound connect spv peer") ) type discv interface { @@ -56,6 +49,13 @@ type lanDiscv interface { Stop() } +type Security interface { + DoFilter(ip string, pubKey string) error + IsBanned(ip string, level byte, reason string) bool + RegisterFilter(filter security.Filter) + Start() error +} + // Switch handles peer connections and exposes an API to receive incoming messages // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one // or more `Channels`. So while sending outgoing messages is typically performed on the peer, @@ -71,13 +71,11 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap - nodeInfo *NodeInfo // our node info - nodePrivKey crypto.PrivKeyEd25519 // our node privkey + nodeInfo *NodeInfo // our node info + nodePrivKey signlib.PrivKey // our node privkey discv discv lanDiscv lanDiscv - bannedPeer map[string]time.Time - db dbm.DB - mtx sync.Mutex + security Security } // NewSwitch create a new Switch and set discover. @@ -98,22 +96,11 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { sha3pool.Sum256(h[:], data) netID := binary.BigEndian.Uint64(h[:8]) - blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) - - _, yyy, _ := ed25519.GenerateKey(nil) - zzz := yyy.String() - - bytes, err := hex.DecodeString(zzz) - if err != nil { - return nil, err - } - var newKey [64]byte - copy(newKey[:], bytes) - privKey := crypto.PrivKeyEd25519(newKey) + privateKey := config.PrivateKey() if !config.VaultMode { // Create listener l, listenAddr = GetListener(config.P2P) - discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port, netID) + discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID) if err != nil { return nil, err } @@ -122,11 +109,11 @@ func NewSwitch(config *cfg.Config) (*Switch, error) { } } - return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr, netID) + return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID) } // newSwitch creates a new Switch with the given config. -func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string, netID uint64) (*Switch, error) { +func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) { sw := &Switch{ Config: config, peerConfig: DefaultPeerConfig(config.P2P), @@ -135,20 +122,15 @@ func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB d reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), - nodePrivKey: priv, + nodePrivKey: privKey, discv: discv, lanDiscv: lanDiscv, - db: blacklistDB, - nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr, netID), - bannedPeer: make(map[string]time.Time), - } - if err := sw.loadBannedPeers(); err != nil { - return nil, err + nodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID), + security: security.NewSecurity(config), } sw.AddListener(l) sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) - trust.Init() log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network") return sw, nil } @@ -160,6 +142,13 @@ func (sw *Switch) OnStart() error { return err } } + + sw.security.RegisterFilter(sw.nodeInfo) + sw.security.RegisterFilter(sw.peers) + if err := sw.security.Start(); err != nil { + return err + } + for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } @@ -190,21 +179,6 @@ func (sw *Switch) OnStop() { } } -//AddBannedPeer add peer to blacklist -func (sw *Switch) AddBannedPeer(ip string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration) - dataJSON, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), dataJSON) - return nil -} - // AddPeer performs the P2P handshake with a peer // that already has a SecretConnection. If all goes well, // it starts the peer and adds it to the switch. @@ -225,7 +199,7 @@ func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error { } peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN) - if err := sw.filterConnByPeer(peer); err != nil { + if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil { return err } @@ -272,7 +246,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer") sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) - if err := sw.filterConnByIP(addr.IP.String()); err != nil { + if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil { return err } @@ -291,8 +265,8 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { return nil } -func (sw *Switch) ID() [32]byte { - return sw.nodeInfo.PubKey +func (sw *Switch) IsBanned(ip string, level byte, reason string) bool { + return sw.security.IsBanned(ip, level, reason) } //IsDialing prevent duplicate dialing @@ -306,17 +280,6 @@ func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// loadBannedPeers load banned peers from db -func (sw *Switch) loadBannedPeers() error { - if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil { - if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil { - return err - } - } - - return nil -} - // Listeners returns the list of listeners the switch listens on. // NOTE: Not goroutine safe. func (sw *Switch) Listeners() []Listener { @@ -378,22 +341,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return nil } -func (sw *Switch) checkBannedPeer(peer string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - if banEnd, ok := sw.bannedPeer[peer]; ok { - if time.Now().Before(banEnd) { - return ErrConnectBannedPeer - } - - if err := sw.delBannedPeer(peer); err != nil { - return err - } - } - return nil -} - func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) { lanPeers, _, _, numDialing := sw.NumPeers() numToDial := maxNumLANPeers - lanPeers @@ -438,42 +385,6 @@ func (sw *Switch) connectLANPeersRoutine() { } } -func (sw *Switch) delBannedPeer(addr string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - delete(sw.bannedPeer, addr) - datajson, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err - } - - sw.db.Set([]byte(bannedPeerKey), datajson) - return nil -} - -func (sw *Switch) filterConnByIP(ip string) error { - if ip == sw.nodeInfo.listenHost() { - return ErrConnectSelf - } - return sw.checkBannedPeer(ip) -} - -func (sw *Switch) filterConnByPeer(peer *Peer) error { - if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil { - return err - } - - if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) { - return ErrConnectSelf - } - - if sw.peers.Has(peer.Key) { - return ErrDuplicatePeer - } - return nil -} - func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() @@ -508,7 +419,7 @@ func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) { func (sw *Switch) dialPeers(addresses []*NetAddress) { connectedPeers := make(map[string]struct{}) for _, peer := range sw.Peers().List() { - connectedPeers[peer.remoteAddrHost()] = struct{}{} + connectedPeers[peer.RemoteAddrHost()] = struct{}{} } var wg sync.WaitGroup