12 log "github.com/sirupsen/logrus"
14 "github.com/bytom/common"
15 "github.com/bytom/crypto"
19 ticketTimeBucketLen = time.Minute
20 timeWindow = 10 // * ticketTimeBucketLen
21 wantTicketsInWindow = 10
22 collectFrequency = time.Second * 30
23 registerFrequency = time.Second * 60
26 keepTicketConst = time.Minute * 10
27 keepTicketExp = time.Minute * 5
28 targetWaitTime = time.Minute * 10
29 topicQueryTimeout = time.Second * 5
30 topicQueryResend = time.Minute
31 // topic radius detection
32 maxRadius = 0xffffffffffffffff
33 radiusTC = time.Minute * 20
34 radiusBucketsPerBit = 8
43 // timeBucket represents absolute monotonic time in minutes.
44 // It is used as the index into the per-topic ticket buckets.
49 regTime []AbsTime // Per-topic local absolute time when the ticket can be used.
51 // The serial number that was issued by the server.
53 // Used by registrar, tracks absolute time when the ticket was created.
56 // Fields used only by registrants
57 node *Node // the registrar node that signed this ticket
58 refCnt int // tracks number of topics that will be registered using this ticket
59 pong []byte // encoded pong packet signed by the registrar
62 // ticketRef refers to a single topic in a ticket.
63 type ticketRef struct {
65 idx int // index of the topic in t.topics and t.regTime
68 func (ref ticketRef) topic() Topic {
69 return ref.t.topics[ref.idx]
72 func (ref ticketRef) topicRegTime() AbsTime {
73 return ref.t.regTime[ref.idx]
76 func pongToTicket(localTime AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
77 wps := p.data.(*pong).WaitPeriods
78 if len(topics) != len(wps) {
79 return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
81 hash, _, err := wireHash(topics)
85 if hash != p.data.(*pong).TopicHash {
86 return nil, fmt.Errorf("bad topic hash")
93 regTime: make([]AbsTime, len(wps)),
95 // Convert wait periods to local absolute time.
96 for i, wp := range wps {
97 t.regTime[i] = localTime + AbsTime(time.Second*time.Duration(wp))
102 func ticketToPong(t *ticket, pong *pong) {
104 pong.Expiration = uint64(t.issueTime / AbsTime(time.Second))
105 pong.TopicHash, _, err = wireHash(t.topics)
107 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("wireHash err")
109 pong.TicketSerial = t.serial
110 pong.WaitPeriods = make([]uint32, len(t.regTime))
111 for i, regTime := range t.regTime {
112 pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
116 type ticketStore struct {
117 // radius detector and target address generator
118 // exists for both searched and registered topics
119 radius map[Topic]*topicRadius
121 // Contains buckets (for each absolute minute) of tickets
122 // that can be used in that minute.
123 // This is only set if the topic is being registered.
124 tickets map[Topic]*topicTickets
126 regQueue []Topic // Topic registration queue for round robin attempts
127 regSet map[Topic]struct{} // Topic registration queue contents for fast filling
129 nodes map[*Node]*ticket
130 nodeLastReq map[*Node]reqInfo
132 lastBucketFetched timeBucket
133 nextTicketCached *ticketRef
134 nextTicketReg AbsTime
136 searchTopicMap map[Topic]searchTopic
137 nextTopicQueryCleanup AbsTime
138 queriesSent map[*Node]map[common.Hash]sentQuery
141 type searchTopic struct {
142 foundChn chan<- *Node
145 type sentQuery struct {
150 type topicTickets struct {
151 buckets map[timeBucket][]ticketRef
156 func newTicketStore() *ticketStore {
158 radius: make(map[Topic]*topicRadius),
159 tickets: make(map[Topic]*topicTickets),
160 regSet: make(map[Topic]struct{}),
161 nodes: make(map[*Node]*ticket),
162 nodeLastReq: make(map[*Node]reqInfo),
163 searchTopicMap: make(map[Topic]searchTopic),
164 queriesSent: make(map[*Node]map[common.Hash]sentQuery),
168 // addTopic starts tracking a topic. If register is true,
169 // the local node will register the topic and tickets will be collected.
170 func (s *ticketStore) addTopic(topic Topic, register bool) {
171 log.WithFields(log.Fields{"module": logModule, "topic": topic, "register": register}).Debug("Adding discovery topic")
172 if s.radius[topic] == nil {
173 s.radius[topic] = newTopicRadius(topic)
175 if register && s.tickets[topic] == nil {
176 s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
180 func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
182 if s.searchTopicMap[t].foundChn == nil {
183 s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
187 func (s *ticketStore) removeSearchTopic(t Topic) {
188 if st := s.searchTopicMap[t]; st.foundChn != nil {
189 delete(s.searchTopicMap, t)
193 // removeRegisterTopic deletes all tickets for the given topic.
194 func (s *ticketStore) removeRegisterTopic(topic Topic) {
195 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("Removing discovery topic")
196 if s.tickets[topic] == nil {
197 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Removing non-existent discovery topic")
200 for _, list := range s.tickets[topic].buckets {
201 for _, ref := range list {
203 if ref.t.refCnt == 0 {
204 delete(s.nodes, ref.t.node)
205 delete(s.nodeLastReq, ref.t.node)
209 delete(s.tickets, topic)
212 func (s *ticketStore) regTopicSet() []Topic {
213 topics := make([]Topic, 0, len(s.tickets))
214 for topic := range s.tickets {
215 topics = append(topics, topic)
220 // nextRegisterLookup returns the target of the next lookup for ticket collection.
221 func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
222 // Queue up any new topics (or discarded ones), preserving iteration order
223 for topic := range s.tickets {
224 if _, ok := s.regSet[topic]; !ok {
225 s.regQueue = append(s.regQueue, topic)
226 s.regSet[topic] = struct{}{}
229 // Iterate over the set of all topics and look up the next suitable one
230 for len(s.regQueue) > 0 {
231 // Fetch the next topic from the queue, and ensure it still exists
232 topic := s.regQueue[0]
233 s.regQueue = s.regQueue[1:]
234 delete(s.regSet, topic)
236 if s.tickets[topic] == nil {
239 // If the topic needs more tickets, return it
240 if s.tickets[topic].nextLookup < Now() {
241 next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
242 log.WithFields(log.Fields{"module": logModule, "topic": topic, "target": next.target, "delay": delay}).Debug("Found discovery topic to register")
246 // No registration topics found or all exhausted, sleep
247 delay := 40 * time.Second
248 log.WithFields(log.Fields{"module": logModule, "delay": delay}).Debug("No topic found to register")
249 return lookupInfo{}, delay
252 func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
253 tr := s.radius[topic]
254 target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
255 if target.radiusLookup {
258 tr.radiusLookupCnt = 0
263 // ticketsInWindow returns the tickets of a given topic in the registration window.
264 func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
265 // Sanity check that the topic still exists before operating on it
266 if s.tickets[topic] == nil {
267 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Listing non-existing discovery tickets")
270 // Gather all the tickers in the next time window
271 var tickets []ticketRef
273 buckets := s.tickets[topic].buckets
274 for idx := timeBucket(0); idx < timeWindow; idx++ {
275 tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
277 log.WithFields(log.Fields{"module": logModule, "topic": topic, "from": s.lastBucketFetched, "tickets": len(tickets)}).Debug("Retrieved discovery registration tickets")
281 func (s *ticketStore) removeExcessTickets(t Topic) {
282 tickets := s.ticketsInWindow(t)
283 if len(tickets) <= wantTicketsInWindow {
286 sort.Sort(ticketRefByWaitTime(tickets))
287 for _, r := range tickets[wantTicketsInWindow:] {
292 type ticketRefByWaitTime []ticketRef
294 // Len is the number of elements in the collection.
295 func (s ticketRefByWaitTime) Len() int {
299 func (r ticketRef) waitTime() AbsTime {
300 return r.t.regTime[r.idx] - r.t.issueTime
303 // Less reports whether the element with
304 // index i should sort before the element with index j.
305 func (s ticketRefByWaitTime) Less(i, j int) bool {
306 return s[i].waitTime() < s[j].waitTime()
309 // Swap swaps the elements with indexes i and j.
310 func (s ticketRefByWaitTime) Swap(i, j int) {
311 s[i], s[j] = s[j], s[i]
314 func (s *ticketStore) addTicketRef(r ticketRef) {
315 topic := r.t.topics[r.idx]
316 tickets := s.tickets[topic]
318 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Adding ticket to non-existent topic")
321 bucket := timeBucket(r.t.regTime[r.idx] / AbsTime(ticketTimeBucketLen))
322 tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
325 min := Now() - AbsTime(collectFrequency)*maxCollectDebt
326 if tickets.nextLookup < min {
327 tickets.nextLookup = min
329 tickets.nextLookup += AbsTime(collectFrequency)
331 //s.removeExcessTickets(topic)
334 func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
337 ticket, wait := s.nextRegisterableTicket()
342 regTime := now + AbsTime(wait)
343 topic := ticket.t.topics[ticket.idx]
344 if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
347 s.removeTicketRef(*ticket)
351 func (s *ticketStore) ticketRegistered(ref ticketRef) {
354 topic := ref.t.topics[ref.idx]
355 tickets := s.tickets[topic]
356 min := now - AbsTime(registerFrequency)*maxRegisterDebt
357 if min > tickets.nextReg {
358 tickets.nextReg = min
360 tickets.nextReg += AbsTime(registerFrequency)
361 s.tickets[topic] = tickets
363 s.removeTicketRef(ref)
366 // nextRegisterableTicket returns the next ticket that can be used
369 // If the returned wait time <= zero the ticket can be used. For a positive
370 // wait time, the caller should requery the next ticket later.
372 // A ticket can be returned more than once with <= zero wait time in case
373 // the ticket contains multiple topics.
374 func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
376 if s.nextTicketCached != nil {
377 return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
380 for bucket := s.lastBucketFetched; ; bucket++ {
382 empty = true // true if there are no tickets
383 nextTicket ticketRef // uninitialized if this bucket is empty
385 for _, tickets := range s.tickets {
386 //s.removeExcessTickets(topic)
387 if len(tickets.buckets) != 0 {
390 list := tickets.buckets[bucket]
391 for _, ref := range list {
392 //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
393 if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
402 if nextTicket.t != nil {
403 s.nextTicketCached = &nextTicket
404 return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
406 s.lastBucketFetched = bucket
410 // removeTicket removes a ticket from the ticket store
411 func (s *ticketStore) removeTicketRef(ref ticketRef) {
412 log.WithFields(log.Fields{"module": logModule, "node": ref.t.node.ID, "serial": ref.t.serial}).Debug("Removing discovery ticket reference")
414 // Make nextRegisterableTicket return the next available ticket.
415 s.nextTicketCached = nil
418 tickets := s.tickets[topic]
421 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("Removing tickets from unknown topic")
424 bucket := timeBucket(ref.t.regTime[ref.idx] / AbsTime(ticketTimeBucketLen))
425 list := tickets.buckets[bucket]
427 for i, bt := range list {
436 list = append(list[:idx], list[idx+1:]...)
438 tickets.buckets[bucket] = list
440 delete(tickets.buckets, bucket)
443 if ref.t.refCnt == 0 {
444 delete(s.nodes, ref.t.node)
445 delete(s.nodeLastReq, ref.t.node)
449 type lookupInfo struct {
455 type reqInfo struct {
461 // returns -1 if not found
462 func (t *ticket) findIdx(topic Topic) int {
463 for i, tt := range t.topics {
471 func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
473 for i, n := range nodes {
474 if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
475 if lookup.radiusLookup {
476 if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
477 s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
480 if s.nodes[n] == nil {
481 s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
488 func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
490 for i, n := range nodes {
491 if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
492 if lookup.radiusLookup {
493 if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
494 s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
497 if s.canQueryTopic(n, lookup.topic) {
498 hash := query(n, lookup.topic)
500 s.addTopicQuery(common.BytesToHash(hash), n, lookup)
508 func (s *ticketStore) adjustWithTicket(now AbsTime, targetHash common.Hash, t *ticket) {
509 for i, topic := range t.topics {
510 if tt, ok := s.radius[topic]; ok {
511 tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
516 func (s *ticketStore) addTicket(localTime AbsTime, pingHash []byte, ticket *ticket) {
517 log.WithFields(log.Fields{"module": logModule, "node": ticket.node.ID, "serial": ticket.serial}).Debug("Adding discovery ticket")
519 lastReq, ok := s.nodeLastReq[ticket.node]
520 if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
523 s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
525 if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
529 topic := lastReq.lookup.topic
530 topicIdx := ticket.findIdx(topic)
535 bucket := timeBucket(localTime / AbsTime(ticketTimeBucketLen))
536 if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
537 s.lastBucketFetched = bucket
540 if _, ok := s.tickets[topic]; ok {
541 wait := ticket.regTime[topicIdx] - localTime
542 rnd := rand.New(rand.NewSource(time.Now().UnixNano())).ExpFloat64()
546 if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
547 // use the ticket to register this topic
548 //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
549 s.addTicketRef(ticketRef{ticket, topicIdx})
553 if ticket.refCnt > 0 {
554 s.nextTicketCached = nil
555 s.nodes[ticket.node] = ticket
559 func (s *ticketStore) getNodeTicket(node *Node) *ticket {
560 if s.nodes[node] == nil {
561 log.WithFields(log.Fields{"module": logModule, "node": node.ID, "serial": nil}).Debug("Retrieving node ticket")
563 log.WithFields(log.Fields{"module": logModule, "node": node.ID, "serial": s.nodes[node].serial}).Debug("Retrieving node ticket")
568 func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
569 qq := s.queriesSent[node]
572 for _, sq := range qq {
573 if sq.lookup.topic == topic && sq.sent > now-AbsTime(topicQueryResend) {
581 func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
583 qq := s.queriesSent[node]
585 qq = make(map[common.Hash]sentQuery)
586 s.queriesSent[node] = qq
588 qq[hash] = sentQuery{sent: now, lookup: lookup}
589 s.cleanupTopicQueries(now)
592 func (s *ticketStore) cleanupTopicQueries(now AbsTime) {
593 if s.nextTopicQueryCleanup > now {
596 exp := now - AbsTime(topicQueryResend)
597 for n, qq := range s.queriesSent {
598 for h, q := range qq {
604 delete(s.queriesSent, n)
607 s.nextTopicQueryCleanup = now + AbsTime(topicQueryTimeout)
610 func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
612 //fmt.Println("got", from.addr().String(), hash, len(nodes))
613 qq := s.queriesSent[from]
618 if !ok || now > q.sent+AbsTime(topicQueryTimeout) {
625 s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
626 chn := s.searchTopicMap[q.lookup.topic].foundChn
628 //fmt.Println("no channel")
631 for _, node := range nodes {
633 if ip.IsUnspecified() || ip.IsLoopback() {
636 n := NewNode(node.ID, ip, node.UDP, node.TCP)
646 type topicRadius struct {
648 topicHashPrefix uint64
649 radius, minRadius uint64
650 buckets []topicRadiusBucket
655 type topicRadiusEvent int
658 trOutside topicRadiusEvent = iota
664 type topicRadiusBucket struct {
665 weights [trCount]float64
668 lookupSent map[common.Hash]AbsTime
671 func (b *topicRadiusBucket) update(now AbsTime) {
672 if now == b.lastTime {
675 exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
676 for i, w := range b.weights {
677 b.weights[i] = w * exp
681 for target, tm := range b.lookupSent {
682 if now-tm > AbsTime(respTimeout) {
683 b.weights[trNoAdjust] += 1
684 delete(b.lookupSent, target)
689 func (b *topicRadiusBucket) adjust(now AbsTime, inside float64) {
692 b.weights[trOutside] += 1
695 b.weights[trInside] += 1
697 b.weights[trInside] += inside
698 b.weights[trOutside] += 1 - inside
703 func newTopicRadius(t Topic) *topicRadius {
704 topicHash := crypto.Sha256Hash([]byte(t))
705 topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
709 topicHashPrefix: topicHashPrefix,
711 minRadius: maxRadius,
715 func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
716 prefix := binary.BigEndian.Uint64(addrHash[0:8])
718 if prefix != r.topicHashPrefix {
719 log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
721 bucket := int((64 - log2) * radiusBucketsPerBit)
722 max := 64*radiusBucketsPerBit - 1
732 func (r *topicRadius) targetForBucket(bucket int) common.Hash {
733 min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
734 max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
736 b := randUint64n(uint64(max - min))
741 prefix := r.topicHashPrefix ^ xor
742 var target common.Hash
743 binary.BigEndian.PutUint64(target[0:8], prefix)
744 globalRandRead(target[8:])
748 // package rand provides a Read function in Go 1.6 and later, but
749 // we can't use it yet because we still support Go 1.5.
750 func globalRandRead(b []byte) {
753 for n := 0; n < len(b); n++ {
755 val = rand.New(rand.NewSource(time.Now().UnixNano())).Int()
764 func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
765 nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
766 dist := nodePrefix ^ r.topicHashPrefix
767 return dist < r.radius
770 func (r *topicRadius) chooseLookupBucket(a, b int) int {
778 for i := a; i <= b; i++ {
779 if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
786 rnd := randUint(uint32(c))
787 for i := a; i <= b; i++ {
788 if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
795 panic(nil) // should never happen
798 func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
803 if b >= len(r.buckets) {
804 b = len(r.buckets) - 1
805 if r.buckets[b].value > max {
806 max = r.buckets[b].value
810 for i := a; i <= b; i++ {
811 if r.buckets[i].value > max {
812 max = r.buckets[i].value
816 return maxValue-max < minPeakSize
819 func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
821 maxValue := float64(0)
824 for i := range r.buckets {
825 r.buckets[i].update(now)
826 v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
827 r.buckets[i].value = v
828 //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
832 for i, b := range r.buckets {
834 if v < float64(i)*minSlope {
844 minRadBucket := len(r.buckets)
846 for minRadBucket > 0 && sum < minRightSum {
848 b := r.buckets[minRadBucket]
849 sum += b.weights[trInside] + b.weights[trOutside]
851 r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
854 if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
855 lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
858 if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
859 for len(r.buckets) <= maxBucket+lookupWidth {
860 r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]AbsTime)})
862 lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
864 if lookupLeft == -1 {
865 radiusLookup = lookupRight
867 if lookupRight == -1 {
868 radiusLookup = lookupLeft
870 if randUint(2) == 0 {
871 radiusLookup = lookupLeft
873 radiusLookup = lookupRight
878 //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
880 if radiusLookup == -1 {
881 // no more radius lookups needed at the moment, return a radius
884 if minRadBucket < rad {
889 radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
897 func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
899 _, radiusLookup := r.recalcRadius()
900 if radiusLookup != -1 {
901 target := r.targetForBucket(radiusLookup)
902 r.buckets[radiusLookup].lookupSent[target] = Now()
903 return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
907 radExt := r.radius / 2
908 if radExt > maxRadius-r.radius {
909 radExt = maxRadius - r.radius
911 rnd := randUint64n(r.radius) + randUint64n(2*radExt)
918 prefix := r.topicHashPrefix ^ rnd
919 var target common.Hash
920 binary.BigEndian.PutUint64(target[0:8], prefix)
921 globalRandRead(target[8:])
922 return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
925 func (r *topicRadius) adjustWithTicket(now AbsTime, targetHash common.Hash, t ticketRef) {
926 wait := t.t.regTime[t.idx] - t.t.issueTime
927 inside := float64(wait)/float64(targetWaitTime) - 0.5
934 r.adjust(now, targetHash, t.t.node.sha, inside)
937 func (r *topicRadius) adjust(now AbsTime, targetHash, addrHash common.Hash, inside float64) {
938 bucket := r.getBucketIdx(addrHash)
939 //fmt.Println("adjust", bucket, len(r.buckets), inside)
940 if bucket >= len(r.buckets) {
943 r.buckets[bucket].adjust(now, inside)
944 delete(r.buckets[bucket].lookupSent, targetHash)