+++ /dev/null
-// Modified for Tendermint
-// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
-// https://github.com/conformal/btcd/blob/master/LICENSE
-
-package p2p
-
-import (
- "encoding/binary"
- "encoding/json"
- "math"
- "math/rand"
- "net"
- "os"
- "sync"
- "time"
-
- log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
- cmn "github.com/tendermint/tmlibs/common"
-)
-
-const (
- // addresses under which the address manager will claim to need more addresses.
- needAddressThreshold = 1000
-
- // interval used to dump the address cache to disk for future use.
- dumpAddressInterval = time.Minute * 2
-
- // max addresses in each old address bucket.
- oldBucketSize = 64
-
- // buckets we split old addresses over.
- oldBucketCount = 64
-
- // max addresses in each new address bucket.
- newBucketSize = 64
-
- // buckets that we spread new addresses over.
- newBucketCount = 256
-
- // old buckets over which an address group will be spread.
- oldBucketsPerGroup = 4
-
- // new buckets over which an source address group will be spread.
- newBucketsPerGroup = 32
-
- // buckets a frequently seen new address may end up in.
- maxNewBucketsPerAddress = 4
-
- // days before which we assume an address has vanished
- // if we have not seen it announced in that long.
- numMissingDays = 30
-
- // tries without a single success before we assume an address is bad.
- numRetries = 3
-
- // max failures we will accept without a success before considering an address bad.
- maxFailures = 10
-
- // days since the last success before we will consider evicting an address.
- minBadDays = 7
-
- // % of total addresses known returned by GetSelection.
- getSelectionPercent = 23
-
- // min addresses that must be returned by GetSelection. Useful for bootstrapping.
- minGetSelection = 32
-
- // max addresses returned by GetSelection
- // NOTE: this must match "maxPexMessageSize"
- maxGetSelection = 250
-)
-
-const (
- bucketTypeNew = 0x01
- bucketTypeOld = 0x02
-)
-
-// AddrBook - concurrency safe peer address manager.
-type AddrBook struct {
- cmn.BaseService
-
- // immutable after creation
- filePath string
- routabilityStrict bool
- key string
-
- mtx sync.RWMutex
- rand *rand.Rand
- ourAddrs map[string]*NetAddress
- addrLookup map[string]*knownAddress // new & old
- bucketsNew []map[string]*knownAddress
- bucketsOld []map[string]*knownAddress
- nOld int
- nNew int
-}
-
-// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
-func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
- a := &AddrBook{
- filePath: filePath,
- routabilityStrict: routabilityStrict,
- key: crypto.CRandHex(24),
- rand: rand.New(rand.NewSource(time.Now().UnixNano())),
- ourAddrs: make(map[string]*NetAddress),
- addrLookup: make(map[string]*knownAddress),
- bucketsNew: make([]map[string]*knownAddress, newBucketCount),
- bucketsOld: make([]map[string]*knownAddress, oldBucketCount),
- }
- for i := range a.bucketsNew {
- a.bucketsNew[i] = make(map[string]*knownAddress)
- }
- for i := range a.bucketsOld {
- a.bucketsOld[i] = make(map[string]*knownAddress)
- }
- a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
- return a
-}
-
-// OnStart implements Service.
-func (a *AddrBook) OnStart() error {
- if err := a.BaseService.OnStart(); err != nil {
- return err
- }
-
- a.loadFromFile()
- go a.saveRoutine()
- return nil
-}
-
-// OnStop implements Service.
-func (a *AddrBook) OnStop() {
- a.BaseService.OnStop()
-}
-
-// AddAddress add address to address book
-func (a *AddrBook) AddAddress(addr, src *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- log.WithFields(log.Fields{"addr": addr, "src": src}).Debug("add address to address book")
- a.addAddress(addr, src)
-}
-
-func (a *AddrBook) NeedMoreAddrs() bool {
- return a.Size() < needAddressThreshold
-}
-
-func (a *AddrBook) Size() int {
- a.mtx.RLock()
- defer a.mtx.RUnlock()
- return a.size()
-}
-
-func (a *AddrBook) size() int {
- return a.nNew + a.nOld
-}
-
-// PickAddress picks a random address from random bucket
-func (a *AddrBook) PickAddress(bias int) *NetAddress {
- a.mtx.RLock()
- defer a.mtx.RUnlock()
-
- if a.size() == 0 {
- return nil
- }
-
- // make sure bias is in the range [0, 100]
- if bias > 100 {
- bias = 100
- } else if bias < 0 {
- bias = 0
- }
-
- oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
- newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
- pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
- if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
- return nil
- }
-
- var bucket map[string]*knownAddress
- for len(bucket) == 0 {
- if pickFromOldBucket {
- bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
- } else {
- bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
- }
- }
-
- randIndex := a.rand.Intn(len(bucket))
- for _, ka := range bucket {
- if randIndex == 0 {
- return ka.Addr
- }
- randIndex--
- }
- return nil
-}
-
-// MarkGood marks the peer as good and moves it into an "old" bucket.
-func (a *AddrBook) MarkGood(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- if ka := a.addrLookup[addr.String()]; ka != nil {
- ka.markGood()
- if ka.isNew() {
- a.moveToOld(ka)
- }
- }
-}
-
-// MarkAttempt marks that an attempt was made to connect to the address.
-func (a *AddrBook) MarkAttempt(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- if ka := a.addrLookup[addr.String()]; ka != nil {
- ka.markAttempt()
- }
-}
-
-// RemoveAddress removes the address from the book.
-func (a *AddrBook) RemoveAddress(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- if ka := a.addrLookup[addr.String()]; ka != nil {
- log.WithField("addr", addr).Debug("remove address from address book")
- a.removeFromAllBuckets(ka)
- }
-}
-
-// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
-func (a *AddrBook) GetSelection() []*NetAddress {
- a.mtx.RLock()
- defer a.mtx.RUnlock()
-
- bookSize := a.size()
- if bookSize == 0 {
- return nil
- }
-
- numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
- numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
- allAddr := make([]*NetAddress, bookSize)
- i := 0
- for _, ka := range a.addrLookup {
- allAddr[i] = ka.Addr
- i++
- }
-
- // Fisher-Yates shuffle the array. We only need to do the first
- // `numAddresses' since we are throwing the rest.
- for i := 0; i < numAddresses; i++ {
- // pick a number between current index and the end
- j := rand.Intn(len(allAddr)-i) + i
- allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
- }
-
- // slice off the limit we are willing to share.
- return allAddr[:numAddresses]
-}
-
-type addrBookJSON struct {
- Key string
- Addrs []*knownAddress
-}
-
-func (a *AddrBook) Save() {
- a.mtx.RLock()
- defer a.mtx.RUnlock()
-
- log.WithField("size", a.Size()).Debug("saving address book to file")
- aJSON := &addrBookJSON{
- Key: a.key,
- Addrs: []*knownAddress{},
- }
- for _, ka := range a.addrLookup {
- aJSON.Addrs = append(aJSON.Addrs, ka)
- }
-
- jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
- if err != nil {
- log.WithField("err", err).Error("failed to save AddrBook to file")
- return
- }
-
- if err = cmn.WriteFileAtomic(a.filePath, jsonBytes, 0644); err != nil {
- log.WithFields(log.Fields{"file": a.filePath, "err": err}).Error("Failed to save AddrBook to file")
- }
-}
-
-func (a *AddrBook) loadFromFile() bool {
- if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
- return false
- }
-
- r, err := os.Open(a.filePath)
- if err != nil {
- cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", a.filePath, err))
- }
-
- defer r.Close()
- aJSON := &addrBookJSON{}
- if err = json.NewDecoder(r).Decode(aJSON); err != nil {
- cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", a.filePath, err))
- }
-
- a.key = aJSON.Key
- for _, ka := range aJSON.Addrs {
- for _, bucketIndex := range ka.Buckets {
- bucket := a.getBucket(ka.BucketType, bucketIndex)
- bucket[ka.Addr.String()] = ka
- }
- a.addrLookup[ka.Addr.String()] = ka
- if ka.BucketType == bucketTypeNew {
- a.nNew++
- } else {
- a.nOld++
- }
- }
- return true
-}
-
-func (a *AddrBook) saveRoutine() {
- ticker := time.NewTicker(dumpAddressInterval)
- for {
- select {
- case <-ticker.C:
- a.Save()
- case <-a.Quit:
- a.Save()
- return
- }
- }
-}
-
-func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
- switch bucketType {
- case bucketTypeNew:
- return a.bucketsNew[bucketIdx]
- case bucketTypeOld:
- return a.bucketsOld[bucketIdx]
- default:
- cmn.PanicSanity("Should not happen")
- return nil
- }
-}
-
-// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
-// NOTE: currently it always returns true.
-func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
- if ka.isOld() {
- log.WithField("know address", ka).Error("cant add old address to new bucket")
- return false
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeNew, bucketIdx)
- if _, ok := bucket[addrStr]; ok {
- return true
- }
-
- if len(bucket) > newBucketSize {
- log.Debug("addToNewBucket: new bucket is full, expiring new")
- a.expireNew(bucketIdx)
- }
-
- bucket[addrStr] = ka
- a.addrLookup[addrStr] = ka
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nNew++
- }
- return true
-}
-
-// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
-func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
- if ka.isNew() {
- log.WithField("know address", ka).Error("cannot add old address to new bucket")
- return false
- }
- if len(ka.Buckets) != 0 {
- log.WithField("know address", ka).Error("cannot add already old address to another old bucket")
- return false
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeOld, bucketIdx)
- if _, ok := bucket[addrStr]; ok {
- return true
- }
-
- if len(bucket) > oldBucketSize {
- return false
- }
-
- // Add to bucket.
- bucket[addrStr] = ka
- a.addrLookup[addrStr] = ka
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nOld++
- }
- return true
-}
-
-func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
- if ka.BucketType != bucketType {
- log.WithField("know address", ka).Error("Bucket type mismatch")
- return
- }
-
- bucket := a.getBucket(bucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- if ka.removeBucketRef(bucketIdx) == 0 {
- if bucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.Addr.String())
- }
-}
-
-func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
- for _, bucketIdx := range ka.Buckets {
- bucket := a.getBucket(ka.BucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- }
- ka.Buckets = nil
- if ka.BucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.Addr.String())
-}
-
-func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
- bucket := a.getBucket(bucketType, bucketIdx)
- var oldest *knownAddress
- for _, ka := range bucket {
- if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
- oldest = ka
- }
- }
- return oldest
-}
-
-func (a *AddrBook) addAddress(addr, src *NetAddress) {
- if _, ok := a.ourAddrs[addr.String()]; ok {
- return
- }
- if a.routabilityStrict && !addr.Routable() {
- log.WithField("address", addr).Error("cannot add non-routable address")
- return
- }
-
- ka := a.addrLookup[addr.String()]
- if ka != nil {
- if ka.isOld() {
- return
- }
- if len(ka.Buckets) == maxNewBucketsPerAddress {
- return
- }
- if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
- return
- }
- } else {
- ka = newKnownAddress(addr, src)
- }
-
- bucket := a.calcNewBucket(addr, src)
- a.addToNewBucket(ka, bucket)
-}
-
-// Make space in the new buckets by expiring the really bad entries.
-// If no bad entries are available we remove the oldest.
-func (a *AddrBook) expireNew(bucketIdx int) {
- for addrStr, ka := range a.bucketsNew[bucketIdx] {
- if ka.isBad() {
- log.WithField("addr", addrStr).Info("expiring bad address")
- a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
- return
- }
- }
-
- oldest := a.pickOldest(bucketTypeNew, bucketIdx)
- a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
-}
-
-// Promotes an address from new to old.
-// TODO: Move to old probabilistically.
-// The better a node is, the less likely it should be evicted from an old bucket.
-func (a *AddrBook) moveToOld(ka *knownAddress) {
- if ka.isOld() {
- log.WithField("know address", ka).Error("cannot promote address that is already old")
- return
- }
- if len(ka.Buckets) == 0 {
- log.WithField("know address", ka).Error("cannot promote address that isn't in any new buckets")
- return
- }
-
- a.removeFromAllBuckets(ka)
- ka.BucketType = bucketTypeOld
-
- // Try to add it to its oldBucket destination.
- oldBucketIdx := a.calcOldBucket(ka.Addr)
- if ok := a.addToOldBucket(ka, oldBucketIdx); ok {
- return
- }
-
- // No room, must evict something
- oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
- a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
- // Find new bucket to put oldest in
- newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
- a.addToNewBucket(oldest, newBucketIdx)
-
- if ok := a.addToOldBucket(ka, oldBucketIdx); !ok {
- log.WithField("know address", ka).Error("could not re-add to oldBucketIdx")
- }
-}
-
-// doublesha256( key + sourcegroup +
-// int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
-func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(a.groupKey(addr))...)
- data1 = append(data1, []byte(a.groupKey(src))...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= newBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(src)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
-}
-
-// doublesha256( key + group +
-// int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
-func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(addr.String())...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= oldBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(addr)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
-}
-
-// Return a string representing the network group of this address.
-// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string
-// "local" for a local address and the string "unroutable for an unroutable
-// address.
-func (a *AddrBook) groupKey(na *NetAddress) string {
- if a.routabilityStrict && na.Local() {
- return "local"
- }
- if a.routabilityStrict && !na.Routable() {
- return "unroutable"
- }
-
- if ipv4 := na.IP.To4(); ipv4 != nil {
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
- }
- if na.RFC6145() || na.RFC6052() {
- // last four bytes are the ip address
- ip := net.IP(na.IP[12:16])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- if na.RFC3964() {
- ip := net.IP(na.IP[2:7])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-
- }
- if na.RFC4380() {
- // teredo tunnels have the last 4 bytes as the v4 address XOR
- // 0xff.
- ip := net.IP(make([]byte, 4))
- for i, byte := range na.IP[12:16] {
- ip[i] = byte ^ 0xff
- }
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- // OK, so now we know ourselves to be a IPv6 address.
- // bitcoind uses /32 for everything, except for Hurricane Electric's
- // (he.net) IP range, which it uses /36 for.
- bits := 32
- heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
- Mask: net.CIDRMask(32, 128)}
- if heNet.Contains(na.IP) {
- bits = 36
- }
-
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
-}
-
-//-----------------------------------------------------------------------------
-
-/*
- knownAddress
-
- tracks information about a known network address that is used
- to determine how viable an address is.
-*/
-type knownAddress struct {
- Addr *NetAddress
- Src *NetAddress
- Attempts int32
- LastAttempt time.Time
- LastSuccess time.Time
- BucketType byte
- Buckets []int
-}
-
-func newKnownAddress(addr, src *NetAddress) *knownAddress {
- return &knownAddress{
- Addr: addr,
- Src: src,
- Attempts: 0,
- LastAttempt: time.Now(),
- BucketType: bucketTypeNew,
- Buckets: nil,
- }
-}
-
-func (ka *knownAddress) isOld() bool {
- return ka.BucketType == bucketTypeOld
-}
-
-func (ka *knownAddress) isNew() bool {
- return ka.BucketType == bucketTypeNew
-}
-
-func (ka *knownAddress) markAttempt() {
- ka.LastAttempt = time.Now()
- ka.Attempts += 1
-}
-
-func (ka *knownAddress) markGood() {
- now := time.Now()
- ka.LastAttempt = now
- ka.LastSuccess = now
- ka.Attempts = 0
-}
-
-func (ka *knownAddress) addBucketRef(bucketIdx int) int {
- for _, bucket := range ka.Buckets {
- if bucket == bucketIdx {
- return -1
- }
- }
- ka.Buckets = append(ka.Buckets, bucketIdx)
- return len(ka.Buckets)
-}
-
-func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
- buckets := []int{}
- for _, bucket := range ka.Buckets {
- if bucket != bucketIdx {
- buckets = append(buckets, bucket)
- }
- }
- if len(buckets) != len(ka.Buckets)-1 {
- return -1
- }
- ka.Buckets = buckets
- return len(ka.Buckets)
-}
-
-/*
- An address is bad if the address in question has not been tried in the last
- minute and meets one of the following criteria:
-
- 1) It claims to be from the future
- 2) It hasn't been seen in over a month
- 3) It has failed at least three times and never succeeded
- 4) It has failed ten times in the last week
-
- All addresses that meet these criteria are assumed to be worthless and not
- worth keeping hold of.
-*/
-func (ka *knownAddress) isBad() bool {
- if ka.BucketType == bucketTypeOld {
- return false
- }
- // Has been attempted in the last minute --> bad
- if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
- return true
- }
-
- // Over a month old?
- if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
- return true
- }
-
- // Never succeeded?
- if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
- return true
- }
-
- // Hasn't succeeded in too long?
- if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
- return true
- }
-
- return false
-}
--- /dev/null
+package pex
+
+import (
+ "encoding/binary"
+ "errors"
+ "math"
+ "math/rand"
+ "net"
+ "sync"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/tendermint/go-crypto"
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+// AddrBook - concurrency safe peer address manager.
+type AddrBook struct {
+ cmn.BaseService
+
+ // immutable after creation
+ filePath string
+ routabilityStrict bool
+ key string
+
+ mtx sync.RWMutex
+ rand *rand.Rand
+ ourAddrs map[string]*NetAddress
+ addrLookup map[string]*knownAddress // new & old
+ bucketsNew []map[string]*knownAddress
+ bucketsOld []map[string]*knownAddress
+ nOld int
+ nNew int
+}
+
+// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
+func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
+ a := &AddrBook{
+ filePath: filePath,
+ routabilityStrict: routabilityStrict,
+ key: crypto.CRandHex(24),
+ rand: rand.New(rand.NewSource(time.Now().UnixNano())),
+ ourAddrs: make(map[string]*NetAddress),
+ addrLookup: make(map[string]*knownAddress),
+ bucketsNew: make([]map[string]*knownAddress, newBucketCount),
+ bucketsOld: make([]map[string]*knownAddress, oldBucketCount),
+ }
+ for i := range a.bucketsNew {
+ a.bucketsNew[i] = make(map[string]*knownAddress)
+ }
+ for i := range a.bucketsOld {
+ a.bucketsOld[i] = make(map[string]*knownAddress)
+ }
+ a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
+ return a
+}
+
+// OnStart implements Service.
+func (a *AddrBook) OnStart() error {
+ if err := a.BaseService.OnStart(); err != nil {
+ return err
+ }
+
+ if err := a.loadFromFile(); err != nil {
+ return err
+ }
+
+ go a.saveRoutine()
+ return nil
+}
+
+// OnStop implements Service.
+func (a *AddrBook) OnStop() {
+ a.BaseService.OnStop()
+}
+
+// AddAddress add address to address book
+func (a *AddrBook) AddAddress(addr, src *NetAddress) error {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+ return a.addAddress(addr, src)
+}
+
+// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
+func (a *AddrBook) GetSelection() []*NetAddress {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ bookSize := a.size()
+ if bookSize == 0 {
+ return nil
+ }
+
+ numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
+ numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
+ allAddr := []*NetAddress{}
+ for _, ka := range a.addrLookup {
+ allAddr = append(allAddr, ka.Addr)
+ }
+
+ for i := 0; i < numAddresses; i++ {
+ j := rand.Intn(len(allAddr)-i) + i
+ allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
+ }
+ return allAddr[:numAddresses]
+}
+
+// MarkGood marks the peer as good and moves it into an "old" bucket.
+func (a *AddrBook) MarkGood(addr *NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ if ka := a.addrLookup[addr.String()]; ka != nil {
+ ka.markGood()
+ if ka.isNew() {
+ a.moveToOld(ka)
+ }
+ }
+}
+
+// MarkAttempt marks that an attempt was made to connect to the address.
+func (a *AddrBook) MarkAttempt(addr *NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ if ka := a.addrLookup[addr.String()]; ka != nil {
+ ka.markAttempt()
+ }
+}
+
+// NeedMoreAddrs check does the address number meet the threshold
+func (a *AddrBook) NeedMoreAddrs() bool {
+ return a.Size() < needAddressThreshold
+}
+
+// PickAddress picks a random address from random bucket
+func (a *AddrBook) PickAddress(bias int) *NetAddress {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ if a.size() == 0 {
+ return nil
+ }
+
+ // make sure bias is in the range [0, 100]
+ if bias > 100 {
+ bias = 100
+ } else if bias < 0 {
+ bias = 0
+ }
+
+ oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
+ newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
+ pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
+ if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
+ return nil
+ }
+
+ var bucket map[string]*knownAddress
+ for len(bucket) == 0 {
+ if pickFromOldBucket {
+ bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
+ } else {
+ bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
+ }
+ }
+
+ randIndex := a.rand.Intn(len(bucket))
+ for _, ka := range bucket {
+ if randIndex == 0 {
+ return ka.Addr
+ }
+ randIndex--
+ }
+ return nil
+}
+
+// RemoveAddress removes the address from the book.
+func (a *AddrBook) RemoveAddress(addr *NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ if ka := a.addrLookup[addr.String()]; ka != nil {
+ a.removeFromAllBuckets(ka)
+ }
+}
+
+// Size count the number of know address
+func (a *AddrBook) Size() int {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+ return a.size()
+}
+
+func (a *AddrBook) addAddress(addr, src *NetAddress) error {
+ if addr == nil || src == nil {
+ return errors.New("can't add nil to address book")
+ }
+ if _, ok := a.ourAddrs[addr.String()]; ok {
+ return errors.New("add ourselves to address book")
+ }
+ if a.routabilityStrict && !addr.Routable() {
+ return errors.New("cannot add non-routable address")
+ }
+
+ ka := a.addrLookup[addr.String()]
+ if ka != nil {
+ if ka.isOld() {
+ return nil
+ }
+ if len(ka.Buckets) == maxNewBucketsPerAddress {
+ return nil
+ }
+ if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
+ return nil
+ }
+ } else {
+ ka = newKnownAddress(addr, src)
+ }
+
+ bucket := a.calcNewBucket(addr, src)
+ return a.addToNewBucket(ka, bucket)
+}
+
+func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) error {
+ if ka.isOld() {
+ return errors.New("cant add old address to new bucket")
+ }
+
+ addrStr := ka.Addr.String()
+ bucket := a.getBucket(bucketTypeNew, bucketIdx)
+ if _, ok := bucket[addrStr]; ok {
+ return nil
+ }
+
+ if len(bucket) > newBucketSize {
+ a.expireNew(bucketIdx)
+ }
+
+ bucket[addrStr] = ka
+ a.addrLookup[addrStr] = ka
+ if ka.addBucketRef(bucketIdx) == 1 {
+ a.nNew++
+ }
+ return nil
+}
+
+func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) error {
+ if ka.isNew() {
+ return errors.New("cannot add old address to new bucket")
+ }
+ if len(ka.Buckets) != 0 {
+ return errors.New("cannot add already old address to another old bucket")
+ }
+
+ bucket := a.getBucket(bucketTypeOld, bucketIdx)
+ if len(bucket) > oldBucketSize {
+ return errors.New("old bucket is full")
+ }
+
+ addrStr := ka.Addr.String()
+ bucket[addrStr] = ka
+ a.addrLookup[addrStr] = ka
+ if ka.addBucketRef(bucketIdx) == 1 {
+ a.nOld++
+ }
+ return nil
+}
+
+func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
+ data1 := []byte{}
+ data1 = append(data1, []byte(a.key)...)
+ data1 = append(data1, []byte(a.groupKey(addr))...)
+ data1 = append(data1, []byte(a.groupKey(src))...)
+ hash1 := doubleSha256(data1)
+ hash64 := binary.BigEndian.Uint64(hash1)
+ hash64 %= newBucketsPerGroup
+ var hashbuf [8]byte
+ binary.BigEndian.PutUint64(hashbuf[:], hash64)
+ data2 := []byte{}
+ data2 = append(data2, []byte(a.key)...)
+ data2 = append(data2, a.groupKey(src)...)
+ data2 = append(data2, hashbuf[:]...)
+
+ hash2 := doubleSha256(data2)
+ return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
+}
+
+func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
+ data1 := []byte{}
+ data1 = append(data1, []byte(a.key)...)
+ data1 = append(data1, []byte(addr.String())...)
+ hash1 := doubleSha256(data1)
+ hash64 := binary.BigEndian.Uint64(hash1)
+ hash64 %= oldBucketsPerGroup
+ var hashbuf [8]byte
+ binary.BigEndian.PutUint64(hashbuf[:], hash64)
+ data2 := []byte{}
+ data2 = append(data2, []byte(a.key)...)
+ data2 = append(data2, a.groupKey(addr)...)
+ data2 = append(data2, hashbuf[:]...)
+
+ hash2 := doubleSha256(data2)
+ return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
+}
+
+func (a *AddrBook) expireNew(bucketIdx int) {
+ for addrStr, ka := range a.bucketsNew[bucketIdx] {
+ if ka.isBad() {
+ a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
+ return
+ }
+ }
+
+ oldest := a.pickOldest(bucketTypeNew, bucketIdx)
+ a.removeFromBucket(oldest, bucketIdx)
+}
+
+func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
+ switch bucketType {
+ case bucketTypeNew:
+ return a.bucketsNew[bucketIdx]
+ case bucketTypeOld:
+ return a.bucketsOld[bucketIdx]
+ default:
+ log.Error("try to access an unknow address book bucket type")
+ return nil
+ }
+}
+
+func (a *AddrBook) groupKey(na *NetAddress) string {
+ if a.routabilityStrict && na.Local() {
+ return "local"
+ }
+ if a.routabilityStrict && !na.Routable() {
+ return "unroutable"
+ }
+ if ipv4 := na.IP.To4(); ipv4 != nil {
+ return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
+ }
+ if na.RFC6145() || na.RFC6052() {
+ // last four bytes are the ip address
+ ip := net.IP(na.IP[12:16])
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+ }
+ if na.RFC3964() {
+ ip := net.IP(na.IP[2:7])
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+
+ }
+ if na.RFC4380() {
+ // teredo tunnels have the last 4 bytes as the v4 address XOR 0xff.
+ ip := net.IP(make([]byte, 4))
+ for i, byte := range na.IP[12:16] {
+ ip[i] = byte ^ 0xff
+ }
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+ }
+
+ bits := 32
+ heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)}
+ if heNet.Contains(na.IP) {
+ bits = 36
+ }
+ return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
+}
+
+func (a *AddrBook) moveToOld(ka *knownAddress) error {
+ if ka.isOld() {
+ return errors.New("cannot promote address that is already old")
+ }
+ if len(ka.Buckets) == 0 {
+ return errors.New("cannot promote address that isn't in any new buckets")
+ }
+
+ a.removeFromAllBuckets(ka)
+ ka.BucketType = bucketTypeOld
+ oldBucketIdx := a.calcOldBucket(ka.Addr)
+ return a.addToOldBucket(ka, oldBucketIdx)
+}
+
+func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
+ bucket := a.getBucket(bucketType, bucketIdx)
+ var oldest *knownAddress
+ for _, ka := range bucket {
+ if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
+ oldest = ka
+ }
+ }
+ return oldest
+}
+
+func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
+ delete(a.addrLookup, ka.Addr.String())
+ for _, bucketIdx := range ka.Buckets {
+ bucket := a.getBucket(ka.BucketType, bucketIdx)
+ delete(bucket, ka.Addr.String())
+ }
+ ka.Buckets = nil
+ if ka.BucketType == bucketTypeNew {
+ a.nNew--
+ } else {
+ a.nOld--
+ }
+}
+
+func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketIdx int) {
+ bucket := a.getBucket(ka.BucketType, bucketIdx)
+ delete(bucket, ka.Addr.String())
+ if ka.removeBucketRef(bucketIdx) == 0 {
+ delete(a.addrLookup, ka.Addr.String())
+ if ka.bucketType == bucketTypeNew {
+ a.nNew--
+ } else {
+ a.nOld--
+ }
+ }
+}
+
+func (a *AddrBook) size() int {
+ return a.nNew + a.nOld
+}
--- /dev/null
+package pex
+
+import (
+ "encoding/json"
+ "os"
+ "time"
+)
+
+type addrBookJSON struct {
+ Key string
+ Addrs []*knownAddress
+}
+
+func (a *AddrBook) saveToFile() error {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ aJSON := &addrBookJSON{Key: a.key, Addrs: []*knownAddress{}}
+ for _, ka := range a.addrLookup {
+ aJSON.Addrs = append(aJSON.Addrs, ka)
+ }
+
+ rawDats, err := json.MarshalIndent(aJSON, "", "\t")
+ if err != nil {
+ return err
+ }
+ return cmn.WriteFileAtomic(a.filePath, rawDats, 0644)
+}
+
+func (a *AddrBook) loadFromFile() error {
+ if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
+ return nil
+ }
+
+ r, err := os.Open(a.filePath)
+ if err != nil {
+ return error
+ }
+
+ defer r.Close()
+ aJSON := &addrBookJSON{}
+ if err = json.NewDecoder(r).Decode(aJSON); err != nil {
+ return err
+ }
+
+ a.key = aJSON.Key
+ for _, ka := range aJSON.Addrs {
+ a.addrLookup[ka.Addr.String()] = ka
+ for _, bucketIndex := range ka.Buckets {
+ bucket := a.getBucket(ka.BucketType, bucketIndex)
+ bucket[ka.Addr.String()] = ka
+ }
+ if ka.BucketType == bucketTypeNew {
+ a.nNew++
+ } else {
+ a.nOld++
+ }
+ }
+ return nil
+}
+
+func (a *AddrBook) saveRoutine() {
+ ticker := time.NewTicker(2 * time.Minute)
+ for {
+ select {
+ case <-ticker.C:
+ if err := a.saveToFile(); err != nil {
+ log.WithField("err", err).Error("failed to save AddrBook to file")
+ }
+ case <-a.Quit:
+ a.saveToFile()
+ return
+ }
+ }
+}
--- /dev/null
+package pex
+
+import (
+ "time"
+
+ "github.com/bytom/p2p"
+)
+
+type knownAddress struct {
+ Addr *p2p.NetAddress
+ Src *p2p.NetAddress
+ Attempts int32
+ LastAttempt time.Time
+ LastSuccess time.Time
+ BucketType byte
+ Buckets []int
+}
+
+func newKnownAddress(addr, src *p2p.NetAddress) *knownAddress {
+ return &knownAddress{
+ Addr: addr,
+ Src: src,
+ Attempts: 0,
+ LastAttempt: time.Now(),
+ BucketType: bucketTypeNew,
+ Buckets: nil,
+ }
+}
+
+func (ka *knownAddress) isOld() bool {
+ return ka.BucketType == bucketTypeOld
+}
+
+func (ka *knownAddress) isNew() bool {
+ return ka.BucketType == bucketTypeNew
+}
+
+func (ka *knownAddress) markAttempt() {
+ ka.LastAttempt = time.Now()
+ ka.Attempts++
+}
+
+func (ka *knownAddress) markGood() {
+ now := time.Now()
+ ka.LastAttempt = now
+ ka.LastSuccess = now
+ ka.Attempts = 0
+}
+
+func (ka *knownAddress) addBucketRef(bucketIdx int) int {
+ for _, bucket := range ka.Buckets {
+ if bucket == bucketIdx {
+ return -1
+ }
+ }
+ ka.Buckets = append(ka.Buckets, bucketIdx)
+ return len(ka.Buckets)
+}
+
+func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
+ buckets := []int{}
+ for _, bucket := range ka.Buckets {
+ if bucket != bucketIdx {
+ buckets = append(buckets, bucket)
+ }
+ }
+ if len(buckets) != len(ka.Buckets)-1 {
+ return -1
+ }
+ ka.Buckets = buckets
+ return len(ka.Buckets)
+}
+
+func (ka *knownAddress) isBad() bool {
+ if ka.BucketType == bucketTypeOld {
+ return false
+ }
+ if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
+ return true
+ }
+ if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
+ return true
+ }
+ if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
+ return true
+ }
+ if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
+ return true
+ }
+ return false
+}
--- /dev/null
+package pex
+
+const (
+ bucketTypeNew = 0x01
+ bucketTypeOld = 0x02
+
+ oldBucketSize = 64
+ oldBucketCount = 64
+ oldBucketsPerGroup = 4
+ newBucketSize = 64
+ newBucketCount = 256
+ newBucketsPerGroup = 32
+
+ getSelectionPercent = 23
+ minGetSelection = 32
+ maxGetSelection = 250
+
+ needAddressThreshold = 1000 // addresses under which the address manager will claim to need more addresses.
+ maxNewBucketsPerAddress = 4 // buckets a frequently seen new address may end up in.
+ numMissingDays = 30 // days before which we assume an address has vanished
+ numRetries = 3 // tries without a single success before we assume an address is bad.
+ maxFailures = 10 // max failures we will accept without a success before considering an address bad.
+ minBadDays = 7 // days since the last success before we will consider evicting an address.
+)
--- /dev/null
+package pex
+
+import (
+ "bytes"
+ "fmt"
+
+ wire "github.com/tendermint/go-wire"
+)
+
+const (
+ msgTypeRequest = byte(0x01)
+ msgTypeAddrs = byte(0x02)
+)
+
+// PexMessage is a primary type for PEX messages. Underneath, it could contain
+// either pexRequestMessage, or pexAddrsMessage messages.
+type PexMessage interface{}
+
+var _ = wire.RegisterInterface(
+ struct{ PexMessage }{},
+ wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
+ wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
+)
+
+// DecodeMessage implements interface registered above.
+func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
+ msgType = bz[0]
+ n := new(int)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
+ return
+}
+
+type pexRequestMessage struct{}
+
+func (m *pexRequestMessage) String() string { return "[pexRequest]" }
+
+type pexAddrsMessage struct {
+ Addrs []*NetAddress
+}
+
+func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
--- /dev/null
+package pex
+
+import (
+ "errors"
+ "math/rand"
+ "reflect"
+ "sync"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+const (
+ // PexChannel is a channel for PEX messages
+ PexChannel = byte(0x00)
+
+ defaultEnsurePeersPeriod = 120 * time.Second // period to ensure peers connected
+ minNumOutboundPeers = 5
+ maxPexMessageSize = 1048576 // 1MB
+ defaultMaxMsgCountByPeer = uint16(1000)
+ msgCountByPeerFlushInterval = 1 * time.Hour
+)
+
+// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
+type PEXReactor struct {
+ BaseReactor
+ book *AddrBook
+ msgCountByPeer *cmn.CMap
+}
+
+// NewPEXReactor creates new PEX reactor.
+func NewPEXReactor(b *AddrBook) *PEXReactor {
+ r := &PEXReactor{
+ book: b,
+ msgCountByPeer: cmn.NewCMap(),
+ }
+ r.BaseReactor = *NewBaseReactor("PEXReactor", r)
+ return r
+}
+
+// OnStart implements BaseService
+func (r *PEXReactor) OnStart() error {
+ r.BaseReactor.OnStart()
+ r.book.Start()
+ go r.ensurePeersRoutine()
+ go r.flushMsgCountByPeer()
+ return nil
+}
+
+// OnStop implements BaseService
+func (r *PEXReactor) OnStop() {
+ r.BaseReactor.OnStop()
+ r.book.Stop()
+}
+
+// GetChannels implements Reactor
+func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
+ return []*ChannelDescriptor{&ChannelDescriptor{
+ ID: PexChannel,
+ Priority: 1,
+ SendQueueCapacity: 10,
+ }}
+}
+
+// AddPeer adding peer to the address book
+func (r *PEXReactor) AddPeer(p *Peer) error {
+ if p.IsOutbound() {
+ if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
+ return errors.New("Send pex message fail")
+ }
+ return nil
+ }
+
+ addr, err := NewNetAddressString(p.ListenAddr)
+ if err != nil {
+ return errors.New("addPeer: invalid peer address")
+ }
+
+ r.book.AddAddress(addr, addr)
+ if r.Switch.peers.Size() >= r.Switch.config.MaxNumPeers {
+ if r.SendAddrs(p, r.book.GetSelection()) {
+ <-time.After(1 * time.Second)
+ r.Switch.StopPeerGracefully(p)
+ }
+ return errors.New("addPeer: reach the max peer, exchange then close")
+ }
+ return nil
+}
+
+// Receive implements Reactor by handling incoming PEX messages.
+func (r *PEXReactor) Receive(chID byte, p *Peer, rawMsg []byte) {
+ srcAddr := p.Connection().RemoteAddress
+ srcAddrStr := srcAddr.String()
+ r.incrementMsgCount(srcAddrStr)
+ if r.reachedMaxMsgLimit(srcAddrStr) {
+ log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+
+ _, msg, err := DecodeMessage(rawMsg)
+ if err != nil {
+ log.WithField("error", err).Error("failed to decoding pex message")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+
+ switch msg := msg.(type) {
+ case *pexRequestMessage:
+ if !r.SendAddrs(src, r.book.GetSelection()) {
+ log.Error("failed to send pex address message")
+ }
+
+ case *pexAddrsMessage:
+ for _, addr := range msg.Addrs {
+ if err := r.book.AddAddress(addr, srcAddr); err != nil {
+ log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+ }
+
+ default:
+ log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
+ }
+}
+
+// RemovePeer implements Reactor.
+func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {}
+
+// RequestPEX asks peer for more addresses.
+func (r *PEXReactor) RequestAddrs(p *Peer) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
+ if !ok {
+ r.Switch.StopPeerGracefully(p)
+ }
+ return ok
+}
+
+// SendAddrs sends addrs to the peer.
+func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
+ if !ok {
+ r.Switch.StopPeerGracefully(p)
+ }
+ return ok
+}
+
+func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+ if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
+ r.book.MarkAttempt(a)
+ } else {
+ r.book.MarkGood(a)
+ }
+ wg.Done()
+}
+
+func (r *PEXReactor) ensurePeers() {
+ numOutPeers, _, numDialing := r.Switch.NumPeers()
+ numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
+ log.WithFields(log.Fields{
+ "numOutPeers": numOutPeers,
+ "numDialing": numDialing,
+ "numToDial": numToDial,
+ }).Debug("ensure peers")
+ if numToDial <= 0 {
+ return
+ }
+
+ newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
+ toDial := make(map[string]*NetAddress)
+ maxAttempts := numToDial * 3
+ for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
+ try := r.book.PickAddress(newBias)
+ if try == nil {
+ continue
+ }
+ if _, selected := toDial[try.IP.String()]; selected {
+ continue
+ }
+ if dialling := r.Switch.IsDialing(try); dialling {
+ continue
+ }
+ if connected := r.Switch.Peers().Has(try.ID); connected {
+ continue
+ }
+
+ log.Debug("Will dial address addr:", try)
+ toDial[try.IP.String()] = try
+ }
+
+ var wg sync.WaitGroup
+ for _, item := range toDial {
+ wg.Add(1)
+ go r.dialPeerWorker(item, &wg)
+ }
+ wg.Wait()
+
+ if r.book.NeedMoreAddrs() {
+ if peers := r.Switch.Peers().List(); len(peers) > 0 {
+ peer := peers[rand.Int()%len(peers)]
+ r.RequestAddrs(peer)
+ }
+ }
+}
+
+func (r *PEXReactor) ensurePeersRoutine() {
+ r.ensurePeers()
+ ticker := time.NewTicker(defaultEnsurePeersPeriod)
+ quickTicker := time.NewTicker(time.Second * 1)
+
+ for {
+ select {
+ case <-ticker.C:
+ r.ensurePeers()
+ case <-quickTicker.C:
+ if r.Switch.peers.Size() < 3 {
+ r.ensurePeers()
+ }
+ case <-r.Quit:
+ return
+ }
+ }
+}
+
+func (r *PEXReactor) flushMsgCountByPeer() {
+ ticker := time.NewTicker(msgCountByPeerFlushInterval)
+ for {
+ select {
+ case <-ticker.C:
+ r.msgCountByPeer.Clear()
+ case <-r.Quit:
+ return
+ }
+ }
+}
+
+func (r *PEXReactor) incrementMsgCount(addr string) {
+ var count uint16
+ if countI := r.msgCountByPeer.Get(addr); countI != nil {
+ count = countI.(uint16)
+ }
+ count++
+ r.msgCountByPeer.Set(addr, count)
+}
+
+func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
+ return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
+}
+++ /dev/null
-package p2p
-
-import (
- "bytes"
- "fmt"
- "math/rand"
- "reflect"
- "strings"
- "sync"
- "time"
-
- log "github.com/sirupsen/logrus"
- wire "github.com/tendermint/go-wire"
- cmn "github.com/tendermint/tmlibs/common"
-
- "github.com/bytom/errors"
-)
-
-const (
- // PexChannel is a channel for PEX messages
- PexChannel = byte(0x00)
-
- // period to ensure peers connected
- defaultEnsurePeersPeriod = 120 * time.Second
- minNumOutboundPeers = 5
- maxPexMessageSize = 1048576 // 1MB
-
- // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
- defaultMaxMsgCountByPeer = 1000
- msgCountByPeerFlushInterval = 1 * time.Hour
-)
-
-var ErrSendPexFail = errors.New("Send pex message fail")
-
-// PEXReactor handles PEX (peer exchange) and ensures that an
-// adequate number of peers are connected to the switch.
-//
-// It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
-//
-// ## Preventing abuse
-//
-// For now, it just limits the number of messages from one peer to
-// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
-// msg/hour).
-//
-// NOTE [2017-01-17]:
-// Limiting is fine for now. Maybe down the road we want to keep track of the
-// quality of peer messages so if peerA keeps telling us about peers we can't
-// connect to then maybe we should care less about peerA. But I don't think
-// that kind of complexity is priority right now.
-type PEXReactor struct {
- BaseReactor
-
- sw *Switch
- book *AddrBook
- ensurePeersPeriod time.Duration
-
- // tracks message count by peer, so we can prevent abuse
- msgCountByPeer *cmn.CMap
- maxMsgCountByPeer uint16
-}
-
-// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
- r := &PEXReactor{
- sw: sw,
- book: b,
- ensurePeersPeriod: defaultEnsurePeersPeriod,
- msgCountByPeer: cmn.NewCMap(),
- maxMsgCountByPeer: defaultMaxMsgCountByPeer,
- }
- r.BaseReactor = *NewBaseReactor("PEXReactor", r)
- return r
-}
-
-// OnStart implements BaseService
-func (r *PEXReactor) OnStart() error {
- r.BaseReactor.OnStart()
- r.book.Start()
- go r.ensurePeersRoutine()
- go r.flushMsgCountByPeer()
- return nil
-}
-
-// OnStop implements BaseService
-func (r *PEXReactor) OnStop() {
- r.BaseReactor.OnStop()
- r.book.Stop()
-}
-
-// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
- return []*ChannelDescriptor{
- &ChannelDescriptor{
- ID: PexChannel,
- Priority: 1,
- SendQueueCapacity: 10,
- },
- }
-}
-
-// AddPeer implements Reactor by adding peer to the address book (if inbound)
-// or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) error {
- if p.IsOutbound() {
- // For outbound peers, the address is already in the books.
- // Either it was added in DialSeeds or when we
- // received the peer's address in r.Receive
- if r.book.NeedMoreAddrs() {
- if ok := r.RequestPEX(p); !ok {
- return ErrSendPexFail
- }
- }
- return nil
- }
-
- // For inbound connections, the peer is its own source
- addr, err := NewNetAddressString(p.ListenAddr)
- if err != nil {
- log.WithFields(log.Fields{"addr": p.ListenAddr, "error": err}).Error("Error in AddPeer: Invalid peer address")
- return errors.New("Error in AddPeer: Invalid peer address")
- }
- r.book.AddAddress(addr, addr)
-
- // close the connect if connect is big than max limit
- if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
- if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
- <-time.After(1 * time.Second)
- r.sw.StopPeerGracefully(p)
- }
- return errors.New("Error in AddPeer: reach the max peer, exchange then close")
- }
-
- return nil
-}
-
-// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
- // If we aren't keeping track of local temp data for each peer here, then we
- // don't have to do anything.
-}
-
-// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
- srcAddr := src.Connection().RemoteAddress
- srcAddrStr := srcAddr.String()
-
- r.IncrementMsgCountForPeer(srcAddrStr)
- if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
- log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
- // TODO remove src from peers?
- return
- }
-
- _, msg, err := DecodeMessage(msgBytes)
- if err != nil {
- log.WithField("error", err).Error("Error decoding message")
- return
- }
- log.WithField("msg", msg).Info("Reveived message")
-
- switch msg := msg.(type) {
- case *pexRequestMessage:
- // src requested some peers.
- if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
- log.Info("Send address message failed. Stop peer.")
- }
- case *pexAddrsMessage:
- // We received some peer addresses from src.
- // (We don't want to get spammed with bad peers)
- for _, addr := range msg.Addrs {
- if addr != nil {
- r.book.AddAddress(addr, srcAddr)
- }
- }
- default:
- log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
- }
-}
-
-// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestPEX(p *Peer) bool {
- ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
- if !ok {
- r.sw.StopPeerGracefully(p)
- }
- return ok
-}
-
-// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
- ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
- if !ok {
- r.sw.StopPeerGracefully(p)
- }
- return ok
-}
-
-// SetEnsurePeersPeriod sets period to ensure peers connected.
-func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
- r.ensurePeersPeriod = d
-}
-
-// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
-func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
- r.maxMsgCountByPeer = v
-}
-
-// ReachedMaxMsgCountForPeer returns true if we received too many
-// messages from peer with address `addr`.
-// NOTE: assumes the value in the CMap is non-nil
-func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
- return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
-}
-
-// Increment or initialize the msg count for the peer in the CMap
-func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
- var count uint16
- countI := r.msgCountByPeer.Get(addr)
- if countI != nil {
- count = countI.(uint16)
- }
- count++
- r.msgCountByPeer.Set(addr, count)
-}
-
-// Ensures that sufficient peers are connected. (continuous)
-func (r *PEXReactor) ensurePeersRoutine() {
- // Randomize when routine starts
- ensurePeersPeriodMs := int64(10000)
- time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
-
- // fire once immediately.
- r.ensurePeers()
-
- // fire periodically
- ticker := time.NewTicker(r.ensurePeersPeriod)
- quickTicker := time.NewTicker(time.Second * 1)
-
- for {
- select {
- case <-ticker.C:
- r.ensurePeers()
- case <-quickTicker.C:
- if r.sw.peers.Size() < 3 {
- r.ensurePeers()
- }
- case <-r.Quit:
- ticker.Stop()
- quickTicker.Stop()
- return
- }
- }
-}
-
-// ensurePeers ensures that sufficient peers are connected. (once)
-//
-// Old bucket / New bucket are arbitrary categories to denote whether an
-// address is vetted or not, and this needs to be determined over time via a
-// heuristic that we haven't perfected yet, or, perhaps is manually edited by
-// the node operator. It should not be used to compute what addresses are
-// already connected or not.
-//
-// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
-// What we're currently doing in terms of marking good/bad peers is just a
-// placeholder. It should not be the case that an address becomes old/vetted
-// upon a single successful connection.
-func (r *PEXReactor) ensurePeers() {
- numOutPeers, _, numDialing := r.Switch.NumPeers()
- numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
- log.WithFields(log.Fields{
- "numOutPeers": numOutPeers,
- "numDialing": numDialing,
- "numToDial": numToDial,
- }).Info("Ensure peers")
- if numToDial <= 0 {
- return
- }
-
- newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
- toDial := make(map[string]*NetAddress)
-
- // Try to pick numToDial addresses to dial.
- for i := 0; i < numToDial; i++ {
- // The purpose of newBias is to first prioritize old (more vetted) peers
- // when we have few connections, but to allow for new (less vetted) peers
- // if we already have many connections. This algorithm isn't perfect, but
- // it somewhat ensures that we prioritize connecting to more-vetted
- // peers.
-
- var picked *NetAddress
- // Try to fetch a new peer 3 times.
- // This caps the maximum number of tries to 3 * numToDial.
- for j := 0; j < 3; j++ {
- try := r.book.PickAddress(newBias)
- if try == nil {
- break
- }
- ka := r.book.addrLookup[try.String()]
- if ka != nil {
- if ka.isBad() {
- continue
- }
- }
- _, alreadySelected := toDial[try.IP.String()]
- alreadyDialing := r.Switch.IsDialing(try)
- var alreadyConnected bool
-
- for _, v := range r.Switch.Peers().list {
- if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
- alreadyConnected = true
- break
- }
- }
- if alreadySelected || alreadyDialing || alreadyConnected {
- continue
- } else {
- log.Debug("Will dial address addr:", try)
- picked = try
- break
- }
- }
- if picked == nil {
- continue
- }
- toDial[picked.IP.String()] = picked
- }
-
- var wg sync.WaitGroup
- for _, item := range toDial {
- wg.Add(1)
- go r.dialPeerWorker(item, &wg)
- }
- wg.Wait()
-
- // If we need more addresses, pick a random peer and ask for more.
- if r.book.NeedMoreAddrs() {
- if peers := r.Switch.Peers().List(); len(peers) > 0 {
- i := rand.Int() % len(peers)
- peer := peers[i]
- log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
- if ok := r.RequestPEX(peer); !ok {
- log.Info("Send request address message failed. Stop peer.")
- }
- }
- }
-}
-
-func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
- if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
- r.book.MarkAttempt(a)
- } else {
- r.book.MarkGood(a)
- }
- wg.Done()
-}
-
-func (r *PEXReactor) flushMsgCountByPeer() {
- ticker := time.NewTicker(msgCountByPeerFlushInterval)
-
- for {
- select {
- case <-ticker.C:
- r.msgCountByPeer.Clear()
- case <-r.Quit:
- ticker.Stop()
- return
- }
- }
-}
-
-//-----------------------------------------------------------------------------
-// Messages
-
-const (
- msgTypeRequest = byte(0x01)
- msgTypeAddrs = byte(0x02)
-)
-
-// PexMessage is a primary type for PEX messages. Underneath, it could contain
-// either pexRequestMessage, or pexAddrsMessage messages.
-type PexMessage interface{}
-
-var _ = wire.RegisterInterface(
- struct{ PexMessage }{},
- wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
- wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
-)
-
-// DecodeMessage implements interface registered above.
-func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
- msgType = bz[0]
- n := new(int)
- r := bytes.NewReader(bz)
- msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
- return
-}
-
-/*
-A pexRequestMessage requests additional peer addresses.
-*/
-type pexRequestMessage struct {
-}
-
-func (m *pexRequestMessage) String() string {
- return "[pexRequest]"
-}
-
-/*
-A message with announced peer addresses.
-*/
-type pexAddrsMessage struct {
- Addrs []*NetAddress
-}
-
-func (m *pexAddrsMessage) String() string {
- return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
-}