OSDN Git Service

fix the code style
authorpaladz <453256728@qq.com>
Mon, 21 May 2018 07:15:39 +0000 (15:15 +0800)
committerpaladz <453256728@qq.com>
Mon, 21 May 2018 07:15:39 +0000 (15:15 +0800)
p2p/addrbook.go [deleted file]
p2p/pex/addrbook.go [new file with mode: 0644]
p2p/pex/file.go [new file with mode: 0644]
p2p/pex/know_address.go [new file with mode: 0644]
p2p/pex/params.go [new file with mode: 0644]
p2p/pex/pex_message.go [new file with mode: 0644]
p2p/pex/pex_reactor.go [new file with mode: 0644]
p2p/pex_reactor.go [deleted file]

diff --git a/p2p/addrbook.go b/p2p/addrbook.go
deleted file mode 100644 (file)
index 4657f08..0000000
+++ /dev/null
@@ -1,729 +0,0 @@
-// Modified for Tendermint
-// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
-// https://github.com/conformal/btcd/blob/master/LICENSE
-
-package p2p
-
-import (
-       "encoding/binary"
-       "encoding/json"
-       "math"
-       "math/rand"
-       "net"
-       "os"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       cmn "github.com/tendermint/tmlibs/common"
-)
-
-const (
-       // addresses under which the address manager will claim to need more addresses.
-       needAddressThreshold = 1000
-
-       // interval used to dump the address cache to disk for future use.
-       dumpAddressInterval = time.Minute * 2
-
-       // max addresses in each old address bucket.
-       oldBucketSize = 64
-
-       // buckets we split old addresses over.
-       oldBucketCount = 64
-
-       // max addresses in each new address bucket.
-       newBucketSize = 64
-
-       // buckets that we spread new addresses over.
-       newBucketCount = 256
-
-       // old buckets over which an address group will be spread.
-       oldBucketsPerGroup = 4
-
-       // new buckets over which an source address group will be spread.
-       newBucketsPerGroup = 32
-
-       // buckets a frequently seen new address may end up in.
-       maxNewBucketsPerAddress = 4
-
-       // days before which we assume an address has vanished
-       // if we have not seen it announced in that long.
-       numMissingDays = 30
-
-       // tries without a single success before we assume an address is bad.
-       numRetries = 3
-
-       // max failures we will accept without a success before considering an address bad.
-       maxFailures = 10
-
-       // days since the last success before we will consider evicting an address.
-       minBadDays = 7
-
-       // % of total addresses known returned by GetSelection.
-       getSelectionPercent = 23
-
-       // min addresses that must be returned by GetSelection. Useful for bootstrapping.
-       minGetSelection = 32
-
-       // max addresses returned by GetSelection
-       // NOTE: this must match "maxPexMessageSize"
-       maxGetSelection = 250
-)
-
-const (
-       bucketTypeNew = 0x01
-       bucketTypeOld = 0x02
-)
-
-// AddrBook - concurrency safe peer address manager.
-type AddrBook struct {
-       cmn.BaseService
-
-       // immutable after creation
-       filePath          string
-       routabilityStrict bool
-       key               string
-
-       mtx        sync.RWMutex
-       rand       *rand.Rand
-       ourAddrs   map[string]*NetAddress
-       addrLookup map[string]*knownAddress // new & old
-       bucketsNew []map[string]*knownAddress
-       bucketsOld []map[string]*knownAddress
-       nOld       int
-       nNew       int
-}
-
-// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
-func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
-       a := &AddrBook{
-               filePath:          filePath,
-               routabilityStrict: routabilityStrict,
-               key:               crypto.CRandHex(24),
-               rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
-               ourAddrs:          make(map[string]*NetAddress),
-               addrLookup:        make(map[string]*knownAddress),
-               bucketsNew:        make([]map[string]*knownAddress, newBucketCount),
-               bucketsOld:        make([]map[string]*knownAddress, oldBucketCount),
-       }
-       for i := range a.bucketsNew {
-               a.bucketsNew[i] = make(map[string]*knownAddress)
-       }
-       for i := range a.bucketsOld {
-               a.bucketsOld[i] = make(map[string]*knownAddress)
-       }
-       a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
-       return a
-}
-
-// OnStart implements Service.
-func (a *AddrBook) OnStart() error {
-       if err := a.BaseService.OnStart(); err != nil {
-               return err
-       }
-
-       a.loadFromFile()
-       go a.saveRoutine()
-       return nil
-}
-
-// OnStop implements Service.
-func (a *AddrBook) OnStop() {
-       a.BaseService.OnStop()
-}
-
-// AddAddress add address to address book
-func (a *AddrBook) AddAddress(addr, src *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       log.WithFields(log.Fields{"addr": addr, "src": src}).Debug("add address to address book")
-       a.addAddress(addr, src)
-}
-
-func (a *AddrBook) NeedMoreAddrs() bool {
-       return a.Size() < needAddressThreshold
-}
-
-func (a *AddrBook) Size() int {
-       a.mtx.RLock()
-       defer a.mtx.RUnlock()
-       return a.size()
-}
-
-func (a *AddrBook) size() int {
-       return a.nNew + a.nOld
-}
-
-// PickAddress picks a random address from random bucket
-func (a *AddrBook) PickAddress(bias int) *NetAddress {
-       a.mtx.RLock()
-       defer a.mtx.RUnlock()
-
-       if a.size() == 0 {
-               return nil
-       }
-
-       // make sure bias is in the range [0, 100]
-       if bias > 100 {
-               bias = 100
-       } else if bias < 0 {
-               bias = 0
-       }
-
-       oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
-       newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
-       pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
-       if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
-               return nil
-       }
-
-       var bucket map[string]*knownAddress
-       for len(bucket) == 0 {
-               if pickFromOldBucket {
-                       bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
-               } else {
-                       bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
-               }
-       }
-
-       randIndex := a.rand.Intn(len(bucket))
-       for _, ka := range bucket {
-               if randIndex == 0 {
-                       return ka.Addr
-               }
-               randIndex--
-       }
-       return nil
-}
-
-// MarkGood marks the peer as good and moves it into an "old" bucket.
-func (a *AddrBook) MarkGood(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       if ka := a.addrLookup[addr.String()]; ka != nil {
-               ka.markGood()
-               if ka.isNew() {
-                       a.moveToOld(ka)
-               }
-       }
-}
-
-// MarkAttempt marks that an attempt was made to connect to the address.
-func (a *AddrBook) MarkAttempt(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       if ka := a.addrLookup[addr.String()]; ka != nil {
-               ka.markAttempt()
-       }
-}
-
-// RemoveAddress removes the address from the book.
-func (a *AddrBook) RemoveAddress(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       if ka := a.addrLookup[addr.String()]; ka != nil {
-               log.WithField("addr", addr).Debug("remove address from address book")
-               a.removeFromAllBuckets(ka)
-       }
-}
-
-// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
-func (a *AddrBook) GetSelection() []*NetAddress {
-       a.mtx.RLock()
-       defer a.mtx.RUnlock()
-
-       bookSize := a.size()
-       if bookSize == 0 {
-               return nil
-       }
-
-       numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
-       numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
-       allAddr := make([]*NetAddress, bookSize)
-       i := 0
-       for _, ka := range a.addrLookup {
-               allAddr[i] = ka.Addr
-               i++
-       }
-
-       // Fisher-Yates shuffle the array. We only need to do the first
-       // `numAddresses' since we are throwing the rest.
-       for i := 0; i < numAddresses; i++ {
-               // pick a number between current index and the end
-               j := rand.Intn(len(allAddr)-i) + i
-               allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
-       }
-
-       // slice off the limit we are willing to share.
-       return allAddr[:numAddresses]
-}
-
-type addrBookJSON struct {
-       Key   string
-       Addrs []*knownAddress
-}
-
-func (a *AddrBook) Save() {
-       a.mtx.RLock()
-       defer a.mtx.RUnlock()
-
-       log.WithField("size", a.Size()).Debug("saving address book to file")
-       aJSON := &addrBookJSON{
-               Key:   a.key,
-               Addrs: []*knownAddress{},
-       }
-       for _, ka := range a.addrLookup {
-               aJSON.Addrs = append(aJSON.Addrs, ka)
-       }
-
-       jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
-       if err != nil {
-               log.WithField("err", err).Error("failed to save AddrBook to file")
-               return
-       }
-
-       if err = cmn.WriteFileAtomic(a.filePath, jsonBytes, 0644); err != nil {
-               log.WithFields(log.Fields{"file": a.filePath, "err": err}).Error("Failed to save AddrBook to file")
-       }
-}
-
-func (a *AddrBook) loadFromFile() bool {
-       if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
-               return false
-       }
-
-       r, err := os.Open(a.filePath)
-       if err != nil {
-               cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", a.filePath, err))
-       }
-
-       defer r.Close()
-       aJSON := &addrBookJSON{}
-       if err = json.NewDecoder(r).Decode(aJSON); err != nil {
-               cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", a.filePath, err))
-       }
-
-       a.key = aJSON.Key
-       for _, ka := range aJSON.Addrs {
-               for _, bucketIndex := range ka.Buckets {
-                       bucket := a.getBucket(ka.BucketType, bucketIndex)
-                       bucket[ka.Addr.String()] = ka
-               }
-               a.addrLookup[ka.Addr.String()] = ka
-               if ka.BucketType == bucketTypeNew {
-                       a.nNew++
-               } else {
-                       a.nOld++
-               }
-       }
-       return true
-}
-
-func (a *AddrBook) saveRoutine() {
-       ticker := time.NewTicker(dumpAddressInterval)
-       for {
-               select {
-               case <-ticker.C:
-                       a.Save()
-               case <-a.Quit:
-                       a.Save()
-                       return
-               }
-       }
-}
-
-func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
-       switch bucketType {
-       case bucketTypeNew:
-               return a.bucketsNew[bucketIdx]
-       case bucketTypeOld:
-               return a.bucketsOld[bucketIdx]
-       default:
-               cmn.PanicSanity("Should not happen")
-               return nil
-       }
-}
-
-// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
-// NOTE: currently it always returns true.
-func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
-       if ka.isOld() {
-               log.WithField("know address", ka).Error("cant add old address to new bucket")
-               return false
-       }
-
-       addrStr := ka.Addr.String()
-       bucket := a.getBucket(bucketTypeNew, bucketIdx)
-       if _, ok := bucket[addrStr]; ok {
-               return true
-       }
-
-       if len(bucket) > newBucketSize {
-               log.Debug("addToNewBucket: new bucket is full, expiring new")
-               a.expireNew(bucketIdx)
-       }
-
-       bucket[addrStr] = ka
-       a.addrLookup[addrStr] = ka
-       if ka.addBucketRef(bucketIdx) == 1 {
-               a.nNew++
-       }
-       return true
-}
-
-// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
-func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
-       if ka.isNew() {
-               log.WithField("know address", ka).Error("cannot add old address to new bucket")
-               return false
-       }
-       if len(ka.Buckets) != 0 {
-               log.WithField("know address", ka).Error("cannot add already old address to another old bucket")
-               return false
-       }
-
-       addrStr := ka.Addr.String()
-       bucket := a.getBucket(bucketTypeOld, bucketIdx)
-       if _, ok := bucket[addrStr]; ok {
-               return true
-       }
-
-       if len(bucket) > oldBucketSize {
-               return false
-       }
-
-       // Add to bucket.
-       bucket[addrStr] = ka
-       a.addrLookup[addrStr] = ka
-       if ka.addBucketRef(bucketIdx) == 1 {
-               a.nOld++
-       }
-       return true
-}
-
-func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
-       if ka.BucketType != bucketType {
-               log.WithField("know address", ka).Error("Bucket type mismatch")
-               return
-       }
-
-       bucket := a.getBucket(bucketType, bucketIdx)
-       delete(bucket, ka.Addr.String())
-       if ka.removeBucketRef(bucketIdx) == 0 {
-               if bucketType == bucketTypeNew {
-                       a.nNew--
-               } else {
-                       a.nOld--
-               }
-               delete(a.addrLookup, ka.Addr.String())
-       }
-}
-
-func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
-       for _, bucketIdx := range ka.Buckets {
-               bucket := a.getBucket(ka.BucketType, bucketIdx)
-               delete(bucket, ka.Addr.String())
-       }
-       ka.Buckets = nil
-       if ka.BucketType == bucketTypeNew {
-               a.nNew--
-       } else {
-               a.nOld--
-       }
-       delete(a.addrLookup, ka.Addr.String())
-}
-
-func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
-       bucket := a.getBucket(bucketType, bucketIdx)
-       var oldest *knownAddress
-       for _, ka := range bucket {
-               if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
-                       oldest = ka
-               }
-       }
-       return oldest
-}
-
-func (a *AddrBook) addAddress(addr, src *NetAddress) {
-       if _, ok := a.ourAddrs[addr.String()]; ok {
-               return
-       }
-       if a.routabilityStrict && !addr.Routable() {
-               log.WithField("address", addr).Error("cannot add non-routable address")
-               return
-       }
-
-       ka := a.addrLookup[addr.String()]
-       if ka != nil {
-               if ka.isOld() {
-                       return
-               }
-               if len(ka.Buckets) == maxNewBucketsPerAddress {
-                       return
-               }
-               if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
-                       return
-               }
-       } else {
-               ka = newKnownAddress(addr, src)
-       }
-
-       bucket := a.calcNewBucket(addr, src)
-       a.addToNewBucket(ka, bucket)
-}
-
-// Make space in the new buckets by expiring the really bad entries.
-// If no bad entries are available we remove the oldest.
-func (a *AddrBook) expireNew(bucketIdx int) {
-       for addrStr, ka := range a.bucketsNew[bucketIdx] {
-               if ka.isBad() {
-                       log.WithField("addr", addrStr).Info("expiring bad address")
-                       a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
-                       return
-               }
-       }
-
-       oldest := a.pickOldest(bucketTypeNew, bucketIdx)
-       a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
-}
-
-// Promotes an address from new to old.
-// TODO: Move to old probabilistically.
-// The better a node is, the less likely it should be evicted from an old bucket.
-func (a *AddrBook) moveToOld(ka *knownAddress) {
-       if ka.isOld() {
-               log.WithField("know address", ka).Error("cannot promote address that is already old")
-               return
-       }
-       if len(ka.Buckets) == 0 {
-               log.WithField("know address", ka).Error("cannot promote address that isn't in any new buckets")
-               return
-       }
-
-       a.removeFromAllBuckets(ka)
-       ka.BucketType = bucketTypeOld
-
-       // Try to add it to its oldBucket destination.
-       oldBucketIdx := a.calcOldBucket(ka.Addr)
-       if ok := a.addToOldBucket(ka, oldBucketIdx); ok {
-               return
-       }
-
-       // No room, must evict something
-       oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
-       a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
-       // Find new bucket to put oldest in
-       newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
-       a.addToNewBucket(oldest, newBucketIdx)
-
-       if ok := a.addToOldBucket(ka, oldBucketIdx); !ok {
-               log.WithField("know address", ka).Error("could not re-add to oldBucketIdx")
-       }
-}
-
-// doublesha256(  key + sourcegroup +
-//                int64(doublesha256(key + group + sourcegroup))%bucket_per_group  ) % num_new_buckets
-func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
-       data1 := []byte{}
-       data1 = append(data1, []byte(a.key)...)
-       data1 = append(data1, []byte(a.groupKey(addr))...)
-       data1 = append(data1, []byte(a.groupKey(src))...)
-       hash1 := doubleSha256(data1)
-       hash64 := binary.BigEndian.Uint64(hash1)
-       hash64 %= newBucketsPerGroup
-       var hashbuf [8]byte
-       binary.BigEndian.PutUint64(hashbuf[:], hash64)
-       data2 := []byte{}
-       data2 = append(data2, []byte(a.key)...)
-       data2 = append(data2, a.groupKey(src)...)
-       data2 = append(data2, hashbuf[:]...)
-
-       hash2 := doubleSha256(data2)
-       return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
-}
-
-// doublesha256(  key + group +
-//                int64(doublesha256(key + addr))%buckets_per_group  ) % num_old_buckets
-func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
-       data1 := []byte{}
-       data1 = append(data1, []byte(a.key)...)
-       data1 = append(data1, []byte(addr.String())...)
-       hash1 := doubleSha256(data1)
-       hash64 := binary.BigEndian.Uint64(hash1)
-       hash64 %= oldBucketsPerGroup
-       var hashbuf [8]byte
-       binary.BigEndian.PutUint64(hashbuf[:], hash64)
-       data2 := []byte{}
-       data2 = append(data2, []byte(a.key)...)
-       data2 = append(data2, a.groupKey(addr)...)
-       data2 = append(data2, hashbuf[:]...)
-
-       hash2 := doubleSha256(data2)
-       return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
-}
-
-// Return a string representing the network group of this address.
-// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string
-// "local" for a local address and the string "unroutable for an unroutable
-// address.
-func (a *AddrBook) groupKey(na *NetAddress) string {
-       if a.routabilityStrict && na.Local() {
-               return "local"
-       }
-       if a.routabilityStrict && !na.Routable() {
-               return "unroutable"
-       }
-
-       if ipv4 := na.IP.To4(); ipv4 != nil {
-               return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
-       }
-       if na.RFC6145() || na.RFC6052() {
-               // last four bytes are the ip address
-               ip := net.IP(na.IP[12:16])
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-       }
-
-       if na.RFC3964() {
-               ip := net.IP(na.IP[2:7])
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-
-       }
-       if na.RFC4380() {
-               // teredo tunnels have the last 4 bytes as the v4 address XOR
-               // 0xff.
-               ip := net.IP(make([]byte, 4))
-               for i, byte := range na.IP[12:16] {
-                       ip[i] = byte ^ 0xff
-               }
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-       }
-
-       // OK, so now we know ourselves to be a IPv6 address.
-       // bitcoind uses /32 for everything, except for Hurricane Electric's
-       // (he.net) IP range, which it uses /36 for.
-       bits := 32
-       heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
-               Mask: net.CIDRMask(32, 128)}
-       if heNet.Contains(na.IP) {
-               bits = 36
-       }
-
-       return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
-}
-
-//-----------------------------------------------------------------------------
-
-/*
-   knownAddress
-
-   tracks information about a known network address that is used
-   to determine how viable an address is.
-*/
-type knownAddress struct {
-       Addr        *NetAddress
-       Src         *NetAddress
-       Attempts    int32
-       LastAttempt time.Time
-       LastSuccess time.Time
-       BucketType  byte
-       Buckets     []int
-}
-
-func newKnownAddress(addr, src *NetAddress) *knownAddress {
-       return &knownAddress{
-               Addr:        addr,
-               Src:         src,
-               Attempts:    0,
-               LastAttempt: time.Now(),
-               BucketType:  bucketTypeNew,
-               Buckets:     nil,
-       }
-}
-
-func (ka *knownAddress) isOld() bool {
-       return ka.BucketType == bucketTypeOld
-}
-
-func (ka *knownAddress) isNew() bool {
-       return ka.BucketType == bucketTypeNew
-}
-
-func (ka *knownAddress) markAttempt() {
-       ka.LastAttempt = time.Now()
-       ka.Attempts += 1
-}
-
-func (ka *knownAddress) markGood() {
-       now := time.Now()
-       ka.LastAttempt = now
-       ka.LastSuccess = now
-       ka.Attempts = 0
-}
-
-func (ka *knownAddress) addBucketRef(bucketIdx int) int {
-       for _, bucket := range ka.Buckets {
-               if bucket == bucketIdx {
-                       return -1
-               }
-       }
-       ka.Buckets = append(ka.Buckets, bucketIdx)
-       return len(ka.Buckets)
-}
-
-func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
-       buckets := []int{}
-       for _, bucket := range ka.Buckets {
-               if bucket != bucketIdx {
-                       buckets = append(buckets, bucket)
-               }
-       }
-       if len(buckets) != len(ka.Buckets)-1 {
-               return -1
-       }
-       ka.Buckets = buckets
-       return len(ka.Buckets)
-}
-
-/*
-   An address is bad if the address in question has not been tried in the last
-   minute and meets one of the following criteria:
-
-   1) It claims to be from the future
-   2) It hasn't been seen in over a month
-   3) It has failed at least three times and never succeeded
-   4) It has failed ten times in the last week
-
-   All addresses that meet these criteria are assumed to be worthless and not
-   worth keeping hold of.
-*/
-func (ka *knownAddress) isBad() bool {
-       if ka.BucketType == bucketTypeOld {
-               return false
-       }
-       // Has been attempted in the last minute --> bad
-       if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
-               return true
-       }
-
-       // Over a month old?
-       if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
-               return true
-       }
-
-       // Never succeeded?
-       if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
-               return true
-       }
-
-       // Hasn't succeeded in too long?
-       if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
-               return true
-       }
-
-       return false
-}
diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go
new file mode 100644 (file)
index 0000000..5902cce
--- /dev/null
@@ -0,0 +1,422 @@
+package pex
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+       "math/rand"
+       "net"
+       "sync"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/tendermint/go-crypto"
+       cmn "github.com/tendermint/tmlibs/common"
+)
+
+// AddrBook - concurrency safe peer address manager.
+type AddrBook struct {
+       cmn.BaseService
+
+       // immutable after creation
+       filePath          string
+       routabilityStrict bool
+       key               string
+
+       mtx        sync.RWMutex
+       rand       *rand.Rand
+       ourAddrs   map[string]*NetAddress
+       addrLookup map[string]*knownAddress // new & old
+       bucketsNew []map[string]*knownAddress
+       bucketsOld []map[string]*knownAddress
+       nOld       int
+       nNew       int
+}
+
+// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
+func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
+       a := &AddrBook{
+               filePath:          filePath,
+               routabilityStrict: routabilityStrict,
+               key:               crypto.CRandHex(24),
+               rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
+               ourAddrs:          make(map[string]*NetAddress),
+               addrLookup:        make(map[string]*knownAddress),
+               bucketsNew:        make([]map[string]*knownAddress, newBucketCount),
+               bucketsOld:        make([]map[string]*knownAddress, oldBucketCount),
+       }
+       for i := range a.bucketsNew {
+               a.bucketsNew[i] = make(map[string]*knownAddress)
+       }
+       for i := range a.bucketsOld {
+               a.bucketsOld[i] = make(map[string]*knownAddress)
+       }
+       a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
+       return a
+}
+
+// OnStart implements Service.
+func (a *AddrBook) OnStart() error {
+       if err := a.BaseService.OnStart(); err != nil {
+               return err
+       }
+
+       if err := a.loadFromFile(); err != nil {
+               return err
+       }
+
+       go a.saveRoutine()
+       return nil
+}
+
+// OnStop implements Service.
+func (a *AddrBook) OnStop() {
+       a.BaseService.OnStop()
+}
+
+// AddAddress add address to address book
+func (a *AddrBook) AddAddress(addr, src *NetAddress) error {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+       return a.addAddress(addr, src)
+}
+
+// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
+func (a *AddrBook) GetSelection() []*NetAddress {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       bookSize := a.size()
+       if bookSize == 0 {
+               return nil
+       }
+
+       numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
+       numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
+       allAddr := []*NetAddress{}
+       for _, ka := range a.addrLookup {
+               allAddr = append(allAddr, ka.Addr)
+       }
+
+       for i := 0; i < numAddresses; i++ {
+               j := rand.Intn(len(allAddr)-i) + i
+               allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
+       }
+       return allAddr[:numAddresses]
+}
+
+// MarkGood marks the peer as good and moves it into an "old" bucket.
+func (a *AddrBook) MarkGood(addr *NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       if ka := a.addrLookup[addr.String()]; ka != nil {
+               ka.markGood()
+               if ka.isNew() {
+                       a.moveToOld(ka)
+               }
+       }
+}
+
+// MarkAttempt marks that an attempt was made to connect to the address.
+func (a *AddrBook) MarkAttempt(addr *NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       if ka := a.addrLookup[addr.String()]; ka != nil {
+               ka.markAttempt()
+       }
+}
+
+// NeedMoreAddrs check does the address number meet the threshold
+func (a *AddrBook) NeedMoreAddrs() bool {
+       return a.Size() < needAddressThreshold
+}
+
+// PickAddress picks a random address from random bucket
+func (a *AddrBook) PickAddress(bias int) *NetAddress {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       if a.size() == 0 {
+               return nil
+       }
+
+       // make sure bias is in the range [0, 100]
+       if bias > 100 {
+               bias = 100
+       } else if bias < 0 {
+               bias = 0
+       }
+
+       oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
+       newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
+       pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
+       if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
+               return nil
+       }
+
+       var bucket map[string]*knownAddress
+       for len(bucket) == 0 {
+               if pickFromOldBucket {
+                       bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
+               } else {
+                       bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
+               }
+       }
+
+       randIndex := a.rand.Intn(len(bucket))
+       for _, ka := range bucket {
+               if randIndex == 0 {
+                       return ka.Addr
+               }
+               randIndex--
+       }
+       return nil
+}
+
+// RemoveAddress removes the address from the book.
+func (a *AddrBook) RemoveAddress(addr *NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       if ka := a.addrLookup[addr.String()]; ka != nil {
+               a.removeFromAllBuckets(ka)
+       }
+}
+
+// Size count the number of know address
+func (a *AddrBook) Size() int {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+       return a.size()
+}
+
+func (a *AddrBook) addAddress(addr, src *NetAddress) error {
+       if addr == nil || src == nil {
+               return errors.New("can't add nil to address book")
+       }
+       if _, ok := a.ourAddrs[addr.String()]; ok {
+               return errors.New("add ourselves to address book")
+       }
+       if a.routabilityStrict && !addr.Routable() {
+               return errors.New("cannot add non-routable address")
+       }
+
+       ka := a.addrLookup[addr.String()]
+       if ka != nil {
+               if ka.isOld() {
+                       return nil
+               }
+               if len(ka.Buckets) == maxNewBucketsPerAddress {
+                       return nil
+               }
+               if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
+                       return nil
+               }
+       } else {
+               ka = newKnownAddress(addr, src)
+       }
+
+       bucket := a.calcNewBucket(addr, src)
+       return a.addToNewBucket(ka, bucket)
+}
+
+func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) error {
+       if ka.isOld() {
+               return errors.New("cant add old address to new bucket")
+       }
+
+       addrStr := ka.Addr.String()
+       bucket := a.getBucket(bucketTypeNew, bucketIdx)
+       if _, ok := bucket[addrStr]; ok {
+               return nil
+       }
+
+       if len(bucket) > newBucketSize {
+               a.expireNew(bucketIdx)
+       }
+
+       bucket[addrStr] = ka
+       a.addrLookup[addrStr] = ka
+       if ka.addBucketRef(bucketIdx) == 1 {
+               a.nNew++
+       }
+       return nil
+}
+
+func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) error {
+       if ka.isNew() {
+               return errors.New("cannot add old address to new bucket")
+       }
+       if len(ka.Buckets) != 0 {
+               return errors.New("cannot add already old address to another old bucket")
+       }
+
+       bucket := a.getBucket(bucketTypeOld, bucketIdx)
+       if len(bucket) > oldBucketSize {
+               return errors.New("old bucket is full")
+       }
+
+       addrStr := ka.Addr.String()
+       bucket[addrStr] = ka
+       a.addrLookup[addrStr] = ka
+       if ka.addBucketRef(bucketIdx) == 1 {
+               a.nOld++
+       }
+       return nil
+}
+
+func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
+       data1 := []byte{}
+       data1 = append(data1, []byte(a.key)...)
+       data1 = append(data1, []byte(a.groupKey(addr))...)
+       data1 = append(data1, []byte(a.groupKey(src))...)
+       hash1 := doubleSha256(data1)
+       hash64 := binary.BigEndian.Uint64(hash1)
+       hash64 %= newBucketsPerGroup
+       var hashbuf [8]byte
+       binary.BigEndian.PutUint64(hashbuf[:], hash64)
+       data2 := []byte{}
+       data2 = append(data2, []byte(a.key)...)
+       data2 = append(data2, a.groupKey(src)...)
+       data2 = append(data2, hashbuf[:]...)
+
+       hash2 := doubleSha256(data2)
+       return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
+}
+
+func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
+       data1 := []byte{}
+       data1 = append(data1, []byte(a.key)...)
+       data1 = append(data1, []byte(addr.String())...)
+       hash1 := doubleSha256(data1)
+       hash64 := binary.BigEndian.Uint64(hash1)
+       hash64 %= oldBucketsPerGroup
+       var hashbuf [8]byte
+       binary.BigEndian.PutUint64(hashbuf[:], hash64)
+       data2 := []byte{}
+       data2 = append(data2, []byte(a.key)...)
+       data2 = append(data2, a.groupKey(addr)...)
+       data2 = append(data2, hashbuf[:]...)
+
+       hash2 := doubleSha256(data2)
+       return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
+}
+
+func (a *AddrBook) expireNew(bucketIdx int) {
+       for addrStr, ka := range a.bucketsNew[bucketIdx] {
+               if ka.isBad() {
+                       a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
+                       return
+               }
+       }
+
+       oldest := a.pickOldest(bucketTypeNew, bucketIdx)
+       a.removeFromBucket(oldest, bucketIdx)
+}
+
+func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
+       switch bucketType {
+       case bucketTypeNew:
+               return a.bucketsNew[bucketIdx]
+       case bucketTypeOld:
+               return a.bucketsOld[bucketIdx]
+       default:
+               log.Error("try to access an unknow address book bucket type")
+               return nil
+       }
+}
+
+func (a *AddrBook) groupKey(na *NetAddress) string {
+       if a.routabilityStrict && na.Local() {
+               return "local"
+       }
+       if a.routabilityStrict && !na.Routable() {
+               return "unroutable"
+       }
+       if ipv4 := na.IP.To4(); ipv4 != nil {
+               return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
+       }
+       if na.RFC6145() || na.RFC6052() {
+               // last four bytes are the ip address
+               ip := net.IP(na.IP[12:16])
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+       }
+       if na.RFC3964() {
+               ip := net.IP(na.IP[2:7])
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+
+       }
+       if na.RFC4380() {
+               // teredo tunnels have the last 4 bytes as the v4 address XOR 0xff.
+               ip := net.IP(make([]byte, 4))
+               for i, byte := range na.IP[12:16] {
+                       ip[i] = byte ^ 0xff
+               }
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+       }
+
+       bits := 32
+       heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)}
+       if heNet.Contains(na.IP) {
+               bits = 36
+       }
+       return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
+}
+
+func (a *AddrBook) moveToOld(ka *knownAddress) error {
+       if ka.isOld() {
+               return errors.New("cannot promote address that is already old")
+       }
+       if len(ka.Buckets) == 0 {
+               return errors.New("cannot promote address that isn't in any new buckets")
+       }
+
+       a.removeFromAllBuckets(ka)
+       ka.BucketType = bucketTypeOld
+       oldBucketIdx := a.calcOldBucket(ka.Addr)
+       return a.addToOldBucket(ka, oldBucketIdx)
+}
+
+func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
+       bucket := a.getBucket(bucketType, bucketIdx)
+       var oldest *knownAddress
+       for _, ka := range bucket {
+               if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
+                       oldest = ka
+               }
+       }
+       return oldest
+}
+
+func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
+       delete(a.addrLookup, ka.Addr.String())
+       for _, bucketIdx := range ka.Buckets {
+               bucket := a.getBucket(ka.BucketType, bucketIdx)
+               delete(bucket, ka.Addr.String())
+       }
+       ka.Buckets = nil
+       if ka.BucketType == bucketTypeNew {
+               a.nNew--
+       } else {
+               a.nOld--
+       }
+}
+
+func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketIdx int) {
+       bucket := a.getBucket(ka.BucketType, bucketIdx)
+       delete(bucket, ka.Addr.String())
+       if ka.removeBucketRef(bucketIdx) == 0 {
+               delete(a.addrLookup, ka.Addr.String())
+               if ka.bucketType == bucketTypeNew {
+                       a.nNew--
+               } else {
+                       a.nOld--
+               }
+       }
+}
+
+func (a *AddrBook) size() int {
+       return a.nNew + a.nOld
+}
diff --git a/p2p/pex/file.go b/p2p/pex/file.go
new file mode 100644 (file)
index 0000000..103ba05
--- /dev/null
@@ -0,0 +1,75 @@
+package pex
+
+import (
+       "encoding/json"
+       "os"
+       "time"
+)
+
+type addrBookJSON struct {
+       Key   string
+       Addrs []*knownAddress
+}
+
+func (a *AddrBook) saveToFile() error {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       aJSON := &addrBookJSON{Key: a.key, Addrs: []*knownAddress{}}
+       for _, ka := range a.addrLookup {
+               aJSON.Addrs = append(aJSON.Addrs, ka)
+       }
+
+       rawDats, err := json.MarshalIndent(aJSON, "", "\t")
+       if err != nil {
+               return err
+       }
+       return cmn.WriteFileAtomic(a.filePath, rawDats, 0644)
+}
+
+func (a *AddrBook) loadFromFile() error {
+       if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
+               return nil
+       }
+
+       r, err := os.Open(a.filePath)
+       if err != nil {
+               return error
+       }
+
+       defer r.Close()
+       aJSON := &addrBookJSON{}
+       if err = json.NewDecoder(r).Decode(aJSON); err != nil {
+               return err
+       }
+
+       a.key = aJSON.Key
+       for _, ka := range aJSON.Addrs {
+               a.addrLookup[ka.Addr.String()] = ka
+               for _, bucketIndex := range ka.Buckets {
+                       bucket := a.getBucket(ka.BucketType, bucketIndex)
+                       bucket[ka.Addr.String()] = ka
+               }
+               if ka.BucketType == bucketTypeNew {
+                       a.nNew++
+               } else {
+                       a.nOld++
+               }
+       }
+       return nil
+}
+
+func (a *AddrBook) saveRoutine() {
+       ticker := time.NewTicker(2 * time.Minute)
+       for {
+               select {
+               case <-ticker.C:
+                       if err := a.saveToFile(); err != nil {
+                               log.WithField("err", err).Error("failed to save AddrBook to file")
+                       }
+               case <-a.Quit:
+                       a.saveToFile()
+                       return
+               }
+       }
+}
diff --git a/p2p/pex/know_address.go b/p2p/pex/know_address.go
new file mode 100644 (file)
index 0000000..94c4431
--- /dev/null
@@ -0,0 +1,91 @@
+package pex
+
+import (
+       "time"
+
+       "github.com/bytom/p2p"
+)
+
+type knownAddress struct {
+       Addr        *p2p.NetAddress
+       Src         *p2p.NetAddress
+       Attempts    int32
+       LastAttempt time.Time
+       LastSuccess time.Time
+       BucketType  byte
+       Buckets     []int
+}
+
+func newKnownAddress(addr, src *p2p.NetAddress) *knownAddress {
+       return &knownAddress{
+               Addr:        addr,
+               Src:         src,
+               Attempts:    0,
+               LastAttempt: time.Now(),
+               BucketType:  bucketTypeNew,
+               Buckets:     nil,
+       }
+}
+
+func (ka *knownAddress) isOld() bool {
+       return ka.BucketType == bucketTypeOld
+}
+
+func (ka *knownAddress) isNew() bool {
+       return ka.BucketType == bucketTypeNew
+}
+
+func (ka *knownAddress) markAttempt() {
+       ka.LastAttempt = time.Now()
+       ka.Attempts++
+}
+
+func (ka *knownAddress) markGood() {
+       now := time.Now()
+       ka.LastAttempt = now
+       ka.LastSuccess = now
+       ka.Attempts = 0
+}
+
+func (ka *knownAddress) addBucketRef(bucketIdx int) int {
+       for _, bucket := range ka.Buckets {
+               if bucket == bucketIdx {
+                       return -1
+               }
+       }
+       ka.Buckets = append(ka.Buckets, bucketIdx)
+       return len(ka.Buckets)
+}
+
+func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
+       buckets := []int{}
+       for _, bucket := range ka.Buckets {
+               if bucket != bucketIdx {
+                       buckets = append(buckets, bucket)
+               }
+       }
+       if len(buckets) != len(ka.Buckets)-1 {
+               return -1
+       }
+       ka.Buckets = buckets
+       return len(ka.Buckets)
+}
+
+func (ka *knownAddress) isBad() bool {
+       if ka.BucketType == bucketTypeOld {
+               return false
+       }
+       if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
+               return true
+       }
+       if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
+               return true
+       }
+       if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
+               return true
+       }
+       if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
+               return true
+       }
+       return false
+}
diff --git a/p2p/pex/params.go b/p2p/pex/params.go
new file mode 100644 (file)
index 0000000..9a02065
--- /dev/null
@@ -0,0 +1,24 @@
+package pex
+
+const (
+       bucketTypeNew = 0x01
+       bucketTypeOld = 0x02
+
+       oldBucketSize      = 64
+       oldBucketCount     = 64
+       oldBucketsPerGroup = 4
+       newBucketSize      = 64
+       newBucketCount     = 256
+       newBucketsPerGroup = 32
+
+       getSelectionPercent = 23
+       minGetSelection     = 32
+       maxGetSelection     = 250
+
+       needAddressThreshold    = 1000 // addresses under which the address manager will claim to need more addresses.
+       maxNewBucketsPerAddress = 4    // buckets a frequently seen new address may end up in.
+       numMissingDays          = 30   // days before which we assume an address has vanished
+       numRetries              = 3    // tries without a single success before we assume an address is bad.
+       maxFailures             = 10   // max failures we will accept without a success before considering an address bad.
+       minBadDays              = 7    // days since the last success before we will consider evicting an address.
+)
diff --git a/p2p/pex/pex_message.go b/p2p/pex/pex_message.go
new file mode 100644 (file)
index 0000000..c7cf133
--- /dev/null
@@ -0,0 +1,42 @@
+package pex
+
+import (
+       "bytes"
+       "fmt"
+
+       wire "github.com/tendermint/go-wire"
+)
+
+const (
+       msgTypeRequest = byte(0x01)
+       msgTypeAddrs   = byte(0x02)
+)
+
+// PexMessage is a primary type for PEX messages. Underneath, it could contain
+// either pexRequestMessage, or pexAddrsMessage messages.
+type PexMessage interface{}
+
+var _ = wire.RegisterInterface(
+       struct{ PexMessage }{},
+       wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
+       wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
+)
+
+// DecodeMessage implements interface registered above.
+func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
+       msgType = bz[0]
+       n := new(int)
+       r := bytes.NewReader(bz)
+       msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
+       return
+}
+
+type pexRequestMessage struct{}
+
+func (m *pexRequestMessage) String() string { return "[pexRequest]" }
+
+type pexAddrsMessage struct {
+       Addrs []*NetAddress
+}
+
+func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go
new file mode 100644 (file)
index 0000000..79e3da0
--- /dev/null
@@ -0,0 +1,250 @@
+package pex
+
+import (
+       "errors"
+       "math/rand"
+       "reflect"
+       "sync"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       cmn "github.com/tendermint/tmlibs/common"
+)
+
+const (
+       // PexChannel is a channel for PEX messages
+       PexChannel = byte(0x00)
+
+       defaultEnsurePeersPeriod    = 120 * time.Second // period to ensure peers connected
+       minNumOutboundPeers         = 5
+       maxPexMessageSize           = 1048576 // 1MB
+       defaultMaxMsgCountByPeer    = uint16(1000)
+       msgCountByPeerFlushInterval = 1 * time.Hour
+)
+
+// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
+type PEXReactor struct {
+       BaseReactor
+       book           *AddrBook
+       msgCountByPeer *cmn.CMap
+}
+
+// NewPEXReactor creates new PEX reactor.
+func NewPEXReactor(b *AddrBook) *PEXReactor {
+       r := &PEXReactor{
+               book:           b,
+               msgCountByPeer: cmn.NewCMap(),
+       }
+       r.BaseReactor = *NewBaseReactor("PEXReactor", r)
+       return r
+}
+
+// OnStart implements BaseService
+func (r *PEXReactor) OnStart() error {
+       r.BaseReactor.OnStart()
+       r.book.Start()
+       go r.ensurePeersRoutine()
+       go r.flushMsgCountByPeer()
+       return nil
+}
+
+// OnStop implements BaseService
+func (r *PEXReactor) OnStop() {
+       r.BaseReactor.OnStop()
+       r.book.Stop()
+}
+
+// GetChannels implements Reactor
+func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
+       return []*ChannelDescriptor{&ChannelDescriptor{
+               ID:                PexChannel,
+               Priority:          1,
+               SendQueueCapacity: 10,
+       }}
+}
+
+// AddPeer adding peer to the address book
+func (r *PEXReactor) AddPeer(p *Peer) error {
+       if p.IsOutbound() {
+               if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
+                       return errors.New("Send pex message fail")
+               }
+               return nil
+       }
+
+       addr, err := NewNetAddressString(p.ListenAddr)
+       if err != nil {
+               return errors.New("addPeer: invalid peer address")
+       }
+
+       r.book.AddAddress(addr, addr)
+       if r.Switch.peers.Size() >= r.Switch.config.MaxNumPeers {
+               if r.SendAddrs(p, r.book.GetSelection()) {
+                       <-time.After(1 * time.Second)
+                       r.Switch.StopPeerGracefully(p)
+               }
+               return errors.New("addPeer: reach the max peer, exchange then close")
+       }
+       return nil
+}
+
+// Receive implements Reactor by handling incoming PEX messages.
+func (r *PEXReactor) Receive(chID byte, p *Peer, rawMsg []byte) {
+       srcAddr := p.Connection().RemoteAddress
+       srcAddrStr := srcAddr.String()
+       r.incrementMsgCount(srcAddrStr)
+       if r.reachedMaxMsgLimit(srcAddrStr) {
+               log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+               r.Switch.StopPeerGracefully(p)
+               return
+       }
+
+       _, msg, err := DecodeMessage(rawMsg)
+       if err != nil {
+               log.WithField("error", err).Error("failed to decoding pex message")
+               r.Switch.StopPeerGracefully(p)
+               return
+       }
+
+       switch msg := msg.(type) {
+       case *pexRequestMessage:
+               if !r.SendAddrs(src, r.book.GetSelection()) {
+                       log.Error("failed to send pex address message")
+               }
+
+       case *pexAddrsMessage:
+               for _, addr := range msg.Addrs {
+                       if err := r.book.AddAddress(addr, srcAddr); err != nil {
+                               log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
+                               r.Switch.StopPeerGracefully(p)
+                               return
+                       }
+               }
+
+       default:
+               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
+       }
+}
+
+// RemovePeer implements Reactor.
+func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {}
+
+// RequestPEX asks peer for more addresses.
+func (r *PEXReactor) RequestAddrs(p *Peer) bool {
+       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
+       if !ok {
+               r.Switch.StopPeerGracefully(p)
+       }
+       return ok
+}
+
+// SendAddrs sends addrs to the peer.
+func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
+       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
+       if !ok {
+               r.Switch.StopPeerGracefully(p)
+       }
+       return ok
+}
+
+func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
+               r.book.MarkAttempt(a)
+       } else {
+               r.book.MarkGood(a)
+       }
+       wg.Done()
+}
+
+func (r *PEXReactor) ensurePeers() {
+       numOutPeers, _, numDialing := r.Switch.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
+       log.WithFields(log.Fields{
+               "numOutPeers": numOutPeers,
+               "numDialing":  numDialing,
+               "numToDial":   numToDial,
+       }).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
+       }
+
+       newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
+       toDial := make(map[string]*NetAddress)
+       maxAttempts := numToDial * 3
+       for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
+               try := r.book.PickAddress(newBias)
+               if try == nil {
+                       continue
+               }
+               if _, selected := toDial[try.IP.String()]; selected {
+                       continue
+               }
+               if dialling := r.Switch.IsDialing(try); dialling {
+                       continue
+               }
+               if connected := r.Switch.Peers().Has(try.ID); connected {
+                       continue
+               }
+
+               log.Debug("Will dial address addr:", try)
+               toDial[try.IP.String()] = try
+       }
+
+       var wg sync.WaitGroup
+       for _, item := range toDial {
+               wg.Add(1)
+               go r.dialPeerWorker(item, &wg)
+       }
+       wg.Wait()
+
+       if r.book.NeedMoreAddrs() {
+               if peers := r.Switch.Peers().List(); len(peers) > 0 {
+                       peer := peers[rand.Int()%len(peers)]
+                       r.RequestAddrs(peer)
+               }
+       }
+}
+
+func (r *PEXReactor) ensurePeersRoutine() {
+       r.ensurePeers()
+       ticker := time.NewTicker(defaultEnsurePeersPeriod)
+       quickTicker := time.NewTicker(time.Second * 1)
+
+       for {
+               select {
+               case <-ticker.C:
+                       r.ensurePeers()
+               case <-quickTicker.C:
+                       if r.Switch.peers.Size() < 3 {
+                               r.ensurePeers()
+                       }
+               case <-r.Quit:
+                       return
+               }
+       }
+}
+
+func (r *PEXReactor) flushMsgCountByPeer() {
+       ticker := time.NewTicker(msgCountByPeerFlushInterval)
+       for {
+               select {
+               case <-ticker.C:
+                       r.msgCountByPeer.Clear()
+               case <-r.Quit:
+                       return
+               }
+       }
+}
+
+func (r *PEXReactor) incrementMsgCount(addr string) {
+       var count uint16
+       if countI := r.msgCountByPeer.Get(addr); countI != nil {
+               count = countI.(uint16)
+       }
+       count++
+       r.msgCountByPeer.Set(addr, count)
+}
+
+func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
+       return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
+}
diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go
deleted file mode 100644 (file)
index 2953207..0000000
+++ /dev/null
@@ -1,418 +0,0 @@
-package p2p
-
-import (
-       "bytes"
-       "fmt"
-       "math/rand"
-       "reflect"
-       "strings"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-       wire "github.com/tendermint/go-wire"
-       cmn "github.com/tendermint/tmlibs/common"
-
-       "github.com/bytom/errors"
-)
-
-const (
-       // PexChannel is a channel for PEX messages
-       PexChannel = byte(0x00)
-
-       // period to ensure peers connected
-       defaultEnsurePeersPeriod = 120 * time.Second
-       minNumOutboundPeers      = 5
-       maxPexMessageSize        = 1048576 // 1MB
-
-       // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
-       defaultMaxMsgCountByPeer    = 1000
-       msgCountByPeerFlushInterval = 1 * time.Hour
-)
-
-var ErrSendPexFail = errors.New("Send pex message fail")
-
-// PEXReactor handles PEX (peer exchange) and ensures that an
-// adequate number of peers are connected to the switch.
-//
-// It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
-//
-// ## Preventing abuse
-//
-// For now, it just limits the number of messages from one peer to
-// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
-// msg/hour).
-//
-// NOTE [2017-01-17]:
-//   Limiting is fine for now. Maybe down the road we want to keep track of the
-//   quality of peer messages so if peerA keeps telling us about peers we can't
-//   connect to then maybe we should care less about peerA. But I don't think
-//   that kind of complexity is priority right now.
-type PEXReactor struct {
-       BaseReactor
-
-       sw                *Switch
-       book              *AddrBook
-       ensurePeersPeriod time.Duration
-
-       // tracks message count by peer, so we can prevent abuse
-       msgCountByPeer    *cmn.CMap
-       maxMsgCountByPeer uint16
-}
-
-// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
-       r := &PEXReactor{
-               sw:                sw,
-               book:              b,
-               ensurePeersPeriod: defaultEnsurePeersPeriod,
-               msgCountByPeer:    cmn.NewCMap(),
-               maxMsgCountByPeer: defaultMaxMsgCountByPeer,
-       }
-       r.BaseReactor = *NewBaseReactor("PEXReactor", r)
-       return r
-}
-
-// OnStart implements BaseService
-func (r *PEXReactor) OnStart() error {
-       r.BaseReactor.OnStart()
-       r.book.Start()
-       go r.ensurePeersRoutine()
-       go r.flushMsgCountByPeer()
-       return nil
-}
-
-// OnStop implements BaseService
-func (r *PEXReactor) OnStop() {
-       r.BaseReactor.OnStop()
-       r.book.Stop()
-}
-
-// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
-       return []*ChannelDescriptor{
-               &ChannelDescriptor{
-                       ID:                PexChannel,
-                       Priority:          1,
-                       SendQueueCapacity: 10,
-               },
-       }
-}
-
-// AddPeer implements Reactor by adding peer to the address book (if inbound)
-// or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) error {
-       if p.IsOutbound() {
-               // For outbound peers, the address is already in the books.
-               // Either it was added in DialSeeds or when we
-               // received the peer's address in r.Receive
-               if r.book.NeedMoreAddrs() {
-                       if ok := r.RequestPEX(p); !ok {
-                               return ErrSendPexFail
-                       }
-               }
-               return nil
-       }
-
-       // For inbound connections, the peer is its own source
-       addr, err := NewNetAddressString(p.ListenAddr)
-       if err != nil {
-               log.WithFields(log.Fields{"addr": p.ListenAddr, "error": err}).Error("Error in AddPeer: Invalid peer address")
-               return errors.New("Error in AddPeer: Invalid peer address")
-       }
-       r.book.AddAddress(addr, addr)
-
-       // close the connect if connect is big than max limit
-       if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
-               if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
-                       <-time.After(1 * time.Second)
-                       r.sw.StopPeerGracefully(p)
-               }
-               return errors.New("Error in AddPeer: reach the max peer, exchange then close")
-       }
-
-       return nil
-}
-
-// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
-       // If we aren't keeping track of local temp data for each peer here, then we
-       // don't have to do anything.
-}
-
-// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
-       srcAddr := src.Connection().RemoteAddress
-       srcAddrStr := srcAddr.String()
-
-       r.IncrementMsgCountForPeer(srcAddrStr)
-       if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
-               log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
-               // TODO remove src from peers?
-               return
-       }
-
-       _, msg, err := DecodeMessage(msgBytes)
-       if err != nil {
-               log.WithField("error", err).Error("Error decoding message")
-               return
-       }
-       log.WithField("msg", msg).Info("Reveived message")
-
-       switch msg := msg.(type) {
-       case *pexRequestMessage:
-               // src requested some peers.
-               if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
-                       log.Info("Send address message failed. Stop peer.")
-               }
-       case *pexAddrsMessage:
-               // We received some peer addresses from src.
-               // (We don't want to get spammed with bad peers)
-               for _, addr := range msg.Addrs {
-                       if addr != nil {
-                               r.book.AddAddress(addr, srcAddr)
-                       }
-               }
-       default:
-               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
-       }
-}
-
-// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestPEX(p *Peer) bool {
-       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
-       if !ok {
-               r.sw.StopPeerGracefully(p)
-       }
-       return ok
-}
-
-// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
-       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
-       if !ok {
-               r.sw.StopPeerGracefully(p)
-       }
-       return ok
-}
-
-// SetEnsurePeersPeriod sets period to ensure peers connected.
-func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
-       r.ensurePeersPeriod = d
-}
-
-// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
-func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
-       r.maxMsgCountByPeer = v
-}
-
-// ReachedMaxMsgCountForPeer returns true if we received too many
-// messages from peer with address `addr`.
-// NOTE: assumes the value in the CMap is non-nil
-func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
-       return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
-}
-
-// Increment or initialize the msg count for the peer in the CMap
-func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
-       var count uint16
-       countI := r.msgCountByPeer.Get(addr)
-       if countI != nil {
-               count = countI.(uint16)
-       }
-       count++
-       r.msgCountByPeer.Set(addr, count)
-}
-
-// Ensures that sufficient peers are connected. (continuous)
-func (r *PEXReactor) ensurePeersRoutine() {
-       // Randomize when routine starts
-       ensurePeersPeriodMs := int64(10000)
-       time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
-
-       // fire once immediately.
-       r.ensurePeers()
-
-       // fire periodically
-       ticker := time.NewTicker(r.ensurePeersPeriod)
-       quickTicker := time.NewTicker(time.Second * 1)
-
-       for {
-               select {
-               case <-ticker.C:
-                       r.ensurePeers()
-               case <-quickTicker.C:
-                       if r.sw.peers.Size() < 3 {
-                               r.ensurePeers()
-                       }
-               case <-r.Quit:
-                       ticker.Stop()
-                       quickTicker.Stop()
-                       return
-               }
-       }
-}
-
-// ensurePeers ensures that sufficient peers are connected. (once)
-//
-// Old bucket / New bucket are arbitrary categories to denote whether an
-// address is vetted or not, and this needs to be determined over time via a
-// heuristic that we haven't perfected yet, or, perhaps is manually edited by
-// the node operator. It should not be used to compute what addresses are
-// already connected or not.
-//
-// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
-// What we're currently doing in terms of marking good/bad peers is just a
-// placeholder. It should not be the case that an address becomes old/vetted
-// upon a single successful connection.
-func (r *PEXReactor) ensurePeers() {
-       numOutPeers, _, numDialing := r.Switch.NumPeers()
-       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
-       log.WithFields(log.Fields{
-               "numOutPeers": numOutPeers,
-               "numDialing":  numDialing,
-               "numToDial":   numToDial,
-       }).Info("Ensure peers")
-       if numToDial <= 0 {
-               return
-       }
-
-       newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
-       toDial := make(map[string]*NetAddress)
-
-       // Try to pick numToDial addresses to dial.
-       for i := 0; i < numToDial; i++ {
-               // The purpose of newBias is to first prioritize old (more vetted) peers
-               // when we have few connections, but to allow for new (less vetted) peers
-               // if we already have many connections. This algorithm isn't perfect, but
-               // it somewhat ensures that we prioritize connecting to more-vetted
-               // peers.
-
-               var picked *NetAddress
-               // Try to fetch a new peer 3 times.
-               // This caps the maximum number of tries to 3 * numToDial.
-               for j := 0; j < 3; j++ {
-                       try := r.book.PickAddress(newBias)
-                       if try == nil {
-                               break
-                       }
-                       ka := r.book.addrLookup[try.String()]
-                       if ka != nil {
-                               if ka.isBad() {
-                                       continue
-                               }
-                       }
-                       _, alreadySelected := toDial[try.IP.String()]
-                       alreadyDialing := r.Switch.IsDialing(try)
-                       var alreadyConnected bool
-
-                       for _, v := range r.Switch.Peers().list {
-                               if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
-                                       alreadyConnected = true
-                                       break
-                               }
-                       }
-                       if alreadySelected || alreadyDialing || alreadyConnected {
-                               continue
-                       } else {
-                               log.Debug("Will dial address addr:", try)
-                               picked = try
-                               break
-                       }
-               }
-               if picked == nil {
-                       continue
-               }
-               toDial[picked.IP.String()] = picked
-       }
-
-       var wg sync.WaitGroup
-       for _, item := range toDial {
-               wg.Add(1)
-               go r.dialPeerWorker(item, &wg)
-       }
-       wg.Wait()
-
-       // If we need more addresses, pick a random peer and ask for more.
-       if r.book.NeedMoreAddrs() {
-               if peers := r.Switch.Peers().List(); len(peers) > 0 {
-                       i := rand.Int() % len(peers)
-                       peer := peers[i]
-                       log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
-                       if ok := r.RequestPEX(peer); !ok {
-                               log.Info("Send request address message failed. Stop peer.")
-                       }
-               }
-       }
-}
-
-func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
-       if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
-               r.book.MarkAttempt(a)
-       } else {
-               r.book.MarkGood(a)
-       }
-       wg.Done()
-}
-
-func (r *PEXReactor) flushMsgCountByPeer() {
-       ticker := time.NewTicker(msgCountByPeerFlushInterval)
-
-       for {
-               select {
-               case <-ticker.C:
-                       r.msgCountByPeer.Clear()
-               case <-r.Quit:
-                       ticker.Stop()
-                       return
-               }
-       }
-}
-
-//-----------------------------------------------------------------------------
-// Messages
-
-const (
-       msgTypeRequest = byte(0x01)
-       msgTypeAddrs   = byte(0x02)
-)
-
-// PexMessage is a primary type for PEX messages. Underneath, it could contain
-// either pexRequestMessage, or pexAddrsMessage messages.
-type PexMessage interface{}
-
-var _ = wire.RegisterInterface(
-       struct{ PexMessage }{},
-       wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
-       wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
-)
-
-// DecodeMessage implements interface registered above.
-func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
-       msgType = bz[0]
-       n := new(int)
-       r := bytes.NewReader(bz)
-       msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
-       return
-}
-
-/*
-A pexRequestMessage requests additional peer addresses.
-*/
-type pexRequestMessage struct {
-}
-
-func (m *pexRequestMessage) String() string {
-       return "[pexRequest]"
-}
-
-/*
-A message with announced peer addresses.
-*/
-type pexAddrsMessage struct {
-       Addrs []*NetAddress
-}
-
-func (m *pexAddrsMessage) String() string {
-       return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
-}