From 6b451b5501c459edac42aeff0771e3e4c3dc515a Mon Sep 17 00:00:00 2001 From: paladz <453256728@qq.com> Date: Sun, 20 May 2018 10:02:10 +0800 Subject: [PATCH] edit the addressBook --- p2p/addrbook.go | 414 ++++++++++++++++++----------------------------------- p2p/pex_reactor.go | 6 +- 2 files changed, 142 insertions(+), 278 deletions(-) diff --git a/p2p/addrbook.go b/p2p/addrbook.go index ed82a75e..4657f089 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -85,53 +85,45 @@ type AddrBook struct { routabilityStrict bool key string - mtx sync.Mutex + mtx sync.RWMutex rand *rand.Rand ourAddrs map[string]*NetAddress addrLookup map[string]*knownAddress // new & old - addrNew []map[string]*knownAddress - addrOld []map[string]*knownAddress + bucketsNew []map[string]*knownAddress + bucketsOld []map[string]*knownAddress nOld int nNew int - - wg sync.WaitGroup } -// NewAddrBook creates a new address book. -// Use Start to begin processing asynchronous address updates. +// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates. func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { - am := &AddrBook{ + a := &AddrBook{ + filePath: filePath, + routabilityStrict: routabilityStrict, + key: crypto.CRandHex(24), rand: rand.New(rand.NewSource(time.Now().UnixNano())), ourAddrs: make(map[string]*NetAddress), addrLookup: make(map[string]*knownAddress), - filePath: filePath, - routabilityStrict: routabilityStrict, + bucketsNew: make([]map[string]*knownAddress, newBucketCount), + bucketsOld: make([]map[string]*knownAddress, oldBucketCount), } - am.init() - am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am) - return am -} - -// When modifying this, don't forget to update loadFromFile() -func (a *AddrBook) init() { - a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits - // New addr buckets - a.addrNew = make([]map[string]*knownAddress, newBucketCount) - for i := range a.addrNew { - a.addrNew[i] = make(map[string]*knownAddress) + for i := range a.bucketsNew { + a.bucketsNew[i] = make(map[string]*knownAddress) } - // Old addr buckets - a.addrOld = make([]map[string]*knownAddress, oldBucketCount) - for i := range a.addrOld { - a.addrOld[i] = make(map[string]*knownAddress) + for i := range a.bucketsOld { + a.bucketsOld[i] = make(map[string]*knownAddress) } + a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a) + return a } // OnStart implements Service. func (a *AddrBook) OnStart() error { - a.BaseService.OnStart() - a.loadFromFile(a.filePath) - a.wg.Add(1) + if err := a.BaseService.OnStart(); err != nil { + return err + } + + a.loadFromFile() go a.saveRoutine() return nil } @@ -141,37 +133,11 @@ func (a *AddrBook) OnStop() { a.BaseService.OnStop() } -func (a *AddrBook) Wait() { - a.wg.Wait() -} - -func (a *AddrBook) AddOurAddress(addr *NetAddress) { +// AddAddress add address to address book +func (a *AddrBook) AddAddress(addr, src *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - log.WithField("addr", addr).Info("Add our address to book") - - a.ourAddrs[addr.String()] = addr -} - -func (a *AddrBook) OurAddresses() []*NetAddress { - addrs := []*NetAddress{} - a.mtx.Lock() - defer a.mtx.Unlock() - - for _, addr := range a.ourAddrs { - addrs = append(addrs, addr) - } - return addrs -} - -// NOTE: addr must not be nil -func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { - a.mtx.Lock() - defer a.mtx.Unlock() - log.WithFields(log.Fields{ - "addr": addr, - "src": src, - }).Debug("Add address to book") + log.WithFields(log.Fields{"addr": addr, "src": src}).Debug("add address to address book") a.addAddress(addr, src) } @@ -180,8 +146,8 @@ func (a *AddrBook) NeedMoreAddrs() bool { } func (a *AddrBook) Size() int { - a.mtx.Lock() - defer a.mtx.Unlock() + a.mtx.RLock() + defer a.mtx.RUnlock() return a.size() } @@ -189,133 +155,102 @@ func (a *AddrBook) size() int { return a.nNew + a.nOld } -// Pick an address to connect to with new/old bias. -func (a *AddrBook) PickAddress(newBias int) *NetAddress { - a.mtx.Lock() - defer a.mtx.Unlock() +// PickAddress picks a random address from random bucket +func (a *AddrBook) PickAddress(bias int) *NetAddress { + a.mtx.RLock() + defer a.mtx.RUnlock() if a.size() == 0 { return nil } - if newBias > 100 { - newBias = 100 - } - if newBias < 0 { - newBias = 0 + + // make sure bias is in the range [0, 100] + if bias > 100 { + bias = 100 + } else if bias < 0 { + bias = 0 } - // Bias between new and old addresses. - oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias)) - newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias) + oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias)) + newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias) + pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation + if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) { + return nil + } - if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { - // pick random Old bucket. - var bucket map[string]*knownAddress = nil - num := 0 - for len(bucket) == 0 && num < oldBucketCount { - bucket = a.addrOld[a.rand.Intn(len(a.addrOld))] - num++ - } - if num == oldBucketCount { - return nil - } - // pick a random ka from bucket. - randIndex := a.rand.Intn(len(bucket)) - for _, ka := range bucket { - if randIndex == 0 { - return ka.Addr - } - randIndex-- - } - cmn.PanicSanity("Should not happen") - } else { - // pick random New bucket. - var bucket map[string]*knownAddress = nil - num := 0 - for len(bucket) == 0 && num < newBucketCount { - bucket = a.addrNew[a.rand.Intn(len(a.addrNew))] - num++ - } - if num == newBucketCount { - return nil + var bucket map[string]*knownAddress + for len(bucket) == 0 { + if pickFromOldBucket { + bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))] + } else { + bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))] } - // pick a random ka from bucket. - randIndex := a.rand.Intn(len(bucket)) - for _, ka := range bucket { - if randIndex == 0 { - return ka.Addr - } - randIndex-- + } + + randIndex := a.rand.Intn(len(bucket)) + for _, ka := range bucket { + if randIndex == 0 { + return ka.Addr } - cmn.PanicSanity("Should not happen") + randIndex-- } return nil } +// MarkGood marks the peer as good and moves it into an "old" bucket. func (a *AddrBook) MarkGood(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] - if ka == nil { - return - } - ka.markGood() - if ka.isNew() { - a.moveToOld(ka) + + if ka := a.addrLookup[addr.String()]; ka != nil { + ka.markGood() + if ka.isNew() { + a.moveToOld(ka) + } } } +// MarkAttempt marks that an attempt was made to connect to the address. func (a *AddrBook) MarkAttempt(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] - if ka == nil { - return - } - ka.markAttempt() -} -// MarkBad currently just ejects the address. In the future, consider -// blacklisting. -func (a *AddrBook) MarkBad(addr *NetAddress) { - a.RemoveAddress(addr) + if ka := a.addrLookup[addr.String()]; ka != nil { + ka.markAttempt() + } } // RemoveAddress removes the address from the book. func (a *AddrBook) RemoveAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] - if ka == nil { - return + + if ka := a.addrLookup[addr.String()]; ka != nil { + log.WithField("addr", addr).Debug("remove address from address book") + a.removeFromAllBuckets(ka) } - log.WithField("addr", addr).Info("Remove address from book") - a.removeFromAllBuckets(ka) } -/* Peer exchange */ - // GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols. func (a *AddrBook) GetSelection() []*NetAddress { - a.mtx.Lock() - defer a.mtx.Unlock() + a.mtx.RLock() + defer a.mtx.RUnlock() - if a.size() == 0 { + bookSize := a.size() + if bookSize == 0 { return nil } - allAddr := make([]*NetAddress, a.size()) + numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100) + numAddresses = cmn.MinInt(maxGetSelection, numAddresses) + + allAddr := make([]*NetAddress, bookSize) i := 0 - for _, v := range a.addrLookup { - allAddr[i] = v.Addr + for _, ka := range a.addrLookup { + allAddr[i] = ka.Addr i++ } - numAddresses := cmn.MaxInt( - cmn.MinInt(minGetSelection, len(allAddr)), - len(allAddr)*getSelectionPercent/100) - numAddresses = cmn.MinInt(maxGetSelection, numAddresses) - // Fisher-Yates shuffle the array. We only need to do the first // `numAddresses' since we are throwing the rest. for i := 0; i < numAddresses; i++ { @@ -328,69 +263,52 @@ func (a *AddrBook) GetSelection() []*NetAddress { return allAddr[:numAddresses] } -/* Loading & Saving */ - type addrBookJSON struct { Key string Addrs []*knownAddress } -func (a *AddrBook) saveToFile(filePath string) { - log.WithField("size", a.Size()).Info("Saving AddrBook to file") - - a.mtx.Lock() - defer a.mtx.Unlock() - // Compile Addrs - addrs := []*knownAddress{} - for _, ka := range a.addrLookup { - addrs = append(addrs, ka) - } +func (a *AddrBook) Save() { + a.mtx.RLock() + defer a.mtx.RUnlock() + log.WithField("size", a.Size()).Debug("saving address book to file") aJSON := &addrBookJSON{ Key: a.key, - Addrs: addrs, + Addrs: []*knownAddress{}, + } + for _, ka := range a.addrLookup { + aJSON.Addrs = append(aJSON.Addrs, ka) } jsonBytes, err := json.MarshalIndent(aJSON, "", "\t") if err != nil { - log.WithField("err", err).Error("Failed to save AddrBook to file") + log.WithField("err", err).Error("failed to save AddrBook to file") return } - err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644) - if err != nil { - log.WithFields(log.Fields{ - "file": filePath, - "err": err, - }).Error("Failed to save AddrBook to file") + + if err = cmn.WriteFileAtomic(a.filePath, jsonBytes, 0644); err != nil { + log.WithFields(log.Fields{"file": a.filePath, "err": err}).Error("Failed to save AddrBook to file") } } -// Returns false if file does not exist. -// cmn.Panics if file is corrupt. -func (a *AddrBook) loadFromFile(filePath string) bool { - // If doesn't exist, do nothing. - _, err := os.Stat(filePath) - if os.IsNotExist(err) { +func (a *AddrBook) loadFromFile() bool { + if _, err := os.Stat(a.filePath); os.IsNotExist(err) { return false } - // Load addrBookJSON{} - r, err := os.Open(filePath) + r, err := os.Open(a.filePath) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err)) + cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", a.filePath, err)) } + defer r.Close() aJSON := &addrBookJSON{} - dec := json.NewDecoder(r) - err = dec.Decode(aJSON) - if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err)) + if err = json.NewDecoder(r).Decode(aJSON); err != nil { + cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", a.filePath, err)) } - // Restore all the fields... - // Restore the key a.key = aJSON.Key - // Restore .addrNew & .addrOld for _, ka := range aJSON.Addrs { for _, bucketIndex := range ka.Buckets { bucket := a.getBucket(ka.BucketType, bucketIndex) @@ -406,37 +324,25 @@ func (a *AddrBook) loadFromFile(filePath string) bool { return true } -// Save saves the book. -func (a *AddrBook) Save() { - log.WithField("size", a.Size()).Info("Saving AddrBook to file") - a.saveToFile(a.filePath) -} - -/* Private methods */ - func (a *AddrBook) saveRoutine() { - dumpAddressTicker := time.NewTicker(dumpAddressInterval) -out: + ticker := time.NewTicker(dumpAddressInterval) for { select { - case <-dumpAddressTicker.C: - a.saveToFile(a.filePath) + case <-ticker.C: + a.Save() case <-a.Quit: - break out + a.Save() + return } } - dumpAddressTicker.Stop() - a.saveToFile(a.filePath) - a.wg.Done() - log.Info("Address handler done") } func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { switch bucketType { case bucketTypeNew: - return a.addrNew[bucketIdx] + return a.bucketsNew[bucketIdx] case bucketTypeOld: - return a.addrOld[bucketIdx] + return a.bucketsOld[bucketIdx] default: cmn.PanicSanity("Should not happen") return nil @@ -446,80 +352,66 @@ func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd // Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full. // NOTE: currently it always returns true. func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { - // Sanity check if ka.isOld() { - log.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka)) + log.WithField("know address", ka).Error("cant add old address to new bucket") return false } addrStr := ka.Addr.String() bucket := a.getBucket(bucketTypeNew, bucketIdx) - - // Already exists? if _, ok := bucket[addrStr]; ok { return true } - // Enforce max addresses. if len(bucket) > newBucketSize { - log.Info("new bucket is full, expiring old ") + log.Debug("addToNewBucket: new bucket is full, expiring new") a.expireNew(bucketIdx) } - // Add to bucket. bucket[addrStr] = ka + a.addrLookup[addrStr] = ka if ka.addBucketRef(bucketIdx) == 1 { a.nNew++ } - - // Ensure in addrLookup - a.addrLookup[addrStr] = ka - return true } // Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full. func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { - // Sanity check if ka.isNew() { - log.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka)) + log.WithField("know address", ka).Error("cannot add old address to new bucket") return false } if len(ka.Buckets) != 0 { - log.Error(cmn.Fmt("Cannot add already old address to another old bucket: %v", ka)) + log.WithField("know address", ka).Error("cannot add already old address to another old bucket") return false } addrStr := ka.Addr.String() bucket := a.getBucket(bucketTypeOld, bucketIdx) - - // Already exists? if _, ok := bucket[addrStr]; ok { return true } - // Enforce max addresses. if len(bucket) > oldBucketSize { return false } // Add to bucket. bucket[addrStr] = ka + a.addrLookup[addrStr] = ka if ka.addBucketRef(bucketIdx) == 1 { a.nOld++ } - - // Ensure in addrLookup - a.addrLookup[addrStr] = ka - return true } func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) { if ka.BucketType != bucketType { - log.Error(cmn.Fmt("Bucket type mismatch: %v", ka)) + log.WithField("know address", ka).Error("Bucket type mismatch") return } + bucket := a.getBucket(bucketType, bucketIdx) delete(bucket, ka.Addr.String()) if ka.removeBucketRef(bucketIdx) == 0 { @@ -558,29 +450,23 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { } func (a *AddrBook) addAddress(addr, src *NetAddress) { - if a.routabilityStrict && !addr.Routable() { - log.Error(cmn.Fmt("Cannot add non-routable address %v", addr)) + if _, ok := a.ourAddrs[addr.String()]; ok { return } - if _, ok := a.ourAddrs[addr.String()]; ok { - // Ignore our own listener address. + if a.routabilityStrict && !addr.Routable() { + log.WithField("address", addr).Error("cannot add non-routable address") return } ka := a.addrLookup[addr.String()] - if ka != nil { - // Already old. if ka.isOld() { return } - // Already in max new buckets. if len(ka.Buckets) == maxNewBucketsPerAddress { return } - // The more entries we have, the less likely we are to add more. - factor := int32(2 * len(ka.Buckets)) - if a.rand.Int31n(factor) != 0 { + if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 { return } } else { @@ -589,23 +475,19 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - - log.Debug("Added new address ", "address:", addr, " total:", a.size()) } // Make space in the new buckets by expiring the really bad entries. // If no bad entries are available we remove the oldest. func (a *AddrBook) expireNew(bucketIdx int) { - for addrStr, ka := range a.addrNew[bucketIdx] { - // If an entry is bad, throw it away + for addrStr, ka := range a.bucketsNew[bucketIdx] { if ka.isBad() { - log.Info(cmn.Fmt("expiring bad address %v", addrStr)) + log.WithField("addr", addrStr).Info("expiring bad address") a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } } - // If we haven't thrown out a bad entry, throw out the oldest entry oldest := a.pickOldest(bucketTypeNew, bucketIdx) a.removeFromBucket(oldest, bucketTypeNew, bucketIdx) } @@ -614,45 +496,33 @@ func (a *AddrBook) expireNew(bucketIdx int) { // TODO: Move to old probabilistically. // The better a node is, the less likely it should be evicted from an old bucket. func (a *AddrBook) moveToOld(ka *knownAddress) { - // Sanity check if ka.isOld() { - log.Error(cmn.Fmt("Cannot promote address that is already old %v", ka)) + log.WithField("know address", ka).Error("cannot promote address that is already old") return } if len(ka.Buckets) == 0 { - log.Error(cmn.Fmt("Cannot promote address that isn't in any new buckets %v", ka)) + log.WithField("know address", ka).Error("cannot promote address that isn't in any new buckets") return } - // Remember one of the buckets in which ka is in. - freedBucket := ka.Buckets[0] - // Remove from all (new) buckets. a.removeFromAllBuckets(ka) - // It's officially old now. ka.BucketType = bucketTypeOld // Try to add it to its oldBucket destination. oldBucketIdx := a.calcOldBucket(ka.Addr) - added := a.addToOldBucket(ka, oldBucketIdx) - if !added { - // No room, must evict something - oldest := a.pickOldest(bucketTypeOld, oldBucketIdx) - a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx) - // Find new bucket to put oldest in - newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src) - added := a.addToNewBucket(oldest, newBucketIdx) - // No space in newBucket either, just put it in freedBucket from above. - if !added { - added := a.addToNewBucket(oldest, freedBucket) - if !added { - log.Error(cmn.Fmt("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket)) - } - } - // Finally, add to bucket again. - added = a.addToOldBucket(ka, oldBucketIdx) - if !added { - log.Error(cmn.Fmt("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx)) - } + if ok := a.addToOldBucket(ka, oldBucketIdx); ok { + return + } + + // No room, must evict something + oldest := a.pickOldest(bucketTypeOld, oldBucketIdx) + a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx) + // Find new bucket to put oldest in + newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src) + a.addToNewBucket(oldest, newBucketIdx) + + if ok := a.addToOldBucket(ka, oldBucketIdx); !ok { + log.WithField("know address", ka).Error("could not re-add to oldBucketIdx") } } @@ -764,7 +634,7 @@ type knownAddress struct { Buckets []int } -func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { +func newKnownAddress(addr, src *NetAddress) *knownAddress { return &knownAddress{ Addr: addr, Src: src, @@ -784,23 +654,20 @@ func (ka *knownAddress) isNew() bool { } func (ka *knownAddress) markAttempt() { - now := time.Now() - ka.LastAttempt = now + ka.LastAttempt = time.Now() ka.Attempts += 1 } func (ka *knownAddress) markGood() { now := time.Now() ka.LastAttempt = now - ka.Attempts = 0 ka.LastSuccess = now + ka.Attempts = 0 } func (ka *knownAddress) addBucketRef(bucketIdx int) int { for _, bucket := range ka.Buckets { if bucket == bucketIdx { - // TODO refactor to return error? - // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka)) return -1 } } @@ -816,8 +683,6 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int { } } if len(buckets) != len(ka.Buckets)-1 { - // TODO refactor to return error? - // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka)) return -1 } ka.Buckets = buckets @@ -837,6 +702,9 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int { worth keeping hold of. */ func (ka *knownAddress) isBad() bool { + if ka.BucketType == bucketTypeOld { + return false + } // Has been attempted in the last minute --> bad if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 { return true diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 8206e5ff..29532071 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -117,11 +117,7 @@ func (r *PEXReactor) AddPeer(p *Peer) error { // For inbound connections, the peer is its own source addr, err := NewNetAddressString(p.ListenAddr) if err != nil { - // this should never happen - log.WithFields(log.Fields{ - "addr": p.ListenAddr, - "error": err, - }).Error("Error in AddPeer: Invalid peer address") + log.WithFields(log.Fields{"addr": p.ListenAddr, "error": err}).Error("Error in AddPeer: Invalid peer address") return errors.New("Error in AddPeer: Invalid peer address") } r.book.AddAddress(addr, addr) -- 2.11.0