OSDN Git Service

new repo
[bytom/vapor.git] / p2p / discover / ticket.go
1 package discover
2
3 import (
4         "bytes"
5         "encoding/binary"
6         "fmt"
7         "math"
8         "math/rand"
9         "sort"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13
14         "github.com/vapor/common"
15         "github.com/vapor/crypto"
16 )
17
18 const (
19         ticketTimeBucketLen = time.Minute
20         timeWindow          = 10 // * ticketTimeBucketLen
21         wantTicketsInWindow = 10
22         collectFrequency    = time.Second * 30
23         registerFrequency   = time.Second * 60
24         maxCollectDebt      = 10
25         maxRegisterDebt     = 5
26         keepTicketConst     = time.Minute * 10
27         keepTicketExp       = time.Minute * 5
28         targetWaitTime      = time.Minute * 10
29         topicQueryTimeout   = time.Second * 5
30         topicQueryResend    = time.Minute
31         // topic radius detection
32         maxRadius           = 0xffffffffffffffff
33         radiusTC            = time.Minute * 20
34         radiusBucketsPerBit = 8
35         minSlope            = 1
36         minPeakSize         = 40
37         maxNoAdjust         = 20
38         lookupWidth         = 8
39         minRightSum         = 20
40         searchForceQuery    = 4
41 )
42
43 // timeBucket represents absolute monotonic time in minutes.
44 // It is used as the index into the per-topic ticket buckets.
45 type timeBucket int
46
47 type ticket struct {
48         topics  []Topic
49         regTime []AbsTime // Per-topic local absolute time when the ticket can be used.
50
51         // The serial number that was issued by the server.
52         serial uint32
53         // Used by registrar, tracks absolute time when the ticket was created.
54         issueTime AbsTime
55
56         // Fields used only by registrants
57         node   *Node  // the registrar node that signed this ticket
58         refCnt int    // tracks number of topics that will be registered using this ticket
59         pong   []byte // encoded pong packet signed by the registrar
60 }
61
62 // ticketRef refers to a single topic in a ticket.
63 type ticketRef struct {
64         t   *ticket
65         idx int // index of the topic in t.topics and t.regTime
66 }
67
68 func (ref ticketRef) topic() Topic {
69         return ref.t.topics[ref.idx]
70 }
71
72 func (ref ticketRef) topicRegTime() AbsTime {
73         return ref.t.regTime[ref.idx]
74 }
75
76 func pongToTicket(localTime AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
77         wps := p.data.(*pong).WaitPeriods
78         if len(topics) != len(wps) {
79                 return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
80         }
81         hash, _, err := wireHash(topics)
82         if err != nil {
83                 return nil, err
84         }
85         if hash != p.data.(*pong).TopicHash {
86                 return nil, fmt.Errorf("bad topic hash")
87         }
88         t := &ticket{
89                 issueTime: localTime,
90                 node:      node,
91                 topics:    topics,
92                 pong:      p.rawData,
93                 regTime:   make([]AbsTime, len(wps)),
94         }
95         // Convert wait periods to local absolute time.
96         for i, wp := range wps {
97                 t.regTime[i] = localTime + AbsTime(time.Second*time.Duration(wp))
98         }
99         return t, nil
100 }
101
102 func ticketToPong(t *ticket, pong *pong) {
103         var err error
104         pong.Expiration = uint64(t.issueTime / AbsTime(time.Second))
105         pong.TopicHash, _, err = wireHash(t.topics)
106         if err != nil {
107                 log.Error("wireHash err:", err)
108         }
109         pong.TicketSerial = t.serial
110         pong.WaitPeriods = make([]uint32, len(t.regTime))
111         for i, regTime := range t.regTime {
112                 pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
113         }
114 }
115
116 type ticketStore struct {
117         // radius detector and target address generator
118         // exists for both searched and registered topics
119         radius map[Topic]*topicRadius
120
121         // Contains buckets (for each absolute minute) of tickets
122         // that can be used in that minute.
123         // This is only set if the topic is being registered.
124         tickets map[Topic]*topicTickets
125
126         regQueue []Topic            // Topic registration queue for round robin attempts
127         regSet   map[Topic]struct{} // Topic registration queue contents for fast filling
128
129         nodes       map[*Node]*ticket
130         nodeLastReq map[*Node]reqInfo
131
132         lastBucketFetched timeBucket
133         nextTicketCached  *ticketRef
134         nextTicketReg     AbsTime
135
136         searchTopicMap        map[Topic]searchTopic
137         nextTopicQueryCleanup AbsTime
138         queriesSent           map[*Node]map[common.Hash]sentQuery
139 }
140
141 type searchTopic struct {
142         foundChn chan<- *Node
143 }
144
145 type sentQuery struct {
146         sent   AbsTime
147         lookup lookupInfo
148 }
149
150 type topicTickets struct {
151         buckets    map[timeBucket][]ticketRef
152         nextLookup AbsTime
153         nextReg    AbsTime
154 }
155
156 func newTicketStore() *ticketStore {
157         return &ticketStore{
158                 radius:         make(map[Topic]*topicRadius),
159                 tickets:        make(map[Topic]*topicTickets),
160                 regSet:         make(map[Topic]struct{}),
161                 nodes:          make(map[*Node]*ticket),
162                 nodeLastReq:    make(map[*Node]reqInfo),
163                 searchTopicMap: make(map[Topic]searchTopic),
164                 queriesSent:    make(map[*Node]map[common.Hash]sentQuery),
165         }
166 }
167
168 // addTopic starts tracking a topic. If register is true,
169 // the local node will register the topic and tickets will be collected.
170 func (s *ticketStore) addTopic(topic Topic, register bool) {
171         log.Debug("Adding discovery topic", "topic", topic, "register", register)
172         if s.radius[topic] == nil {
173                 s.radius[topic] = newTopicRadius(topic)
174         }
175         if register && s.tickets[topic] == nil {
176                 s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
177         }
178 }
179
180 func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
181         s.addTopic(t, false)
182         if s.searchTopicMap[t].foundChn == nil {
183                 s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
184         }
185 }
186
187 func (s *ticketStore) removeSearchTopic(t Topic) {
188         if st := s.searchTopicMap[t]; st.foundChn != nil {
189                 delete(s.searchTopicMap, t)
190         }
191 }
192
193 // removeRegisterTopic deletes all tickets for the given topic.
194 func (s *ticketStore) removeRegisterTopic(topic Topic) {
195         log.Debug("Removing discovery topic", "topic", topic)
196         if s.tickets[topic] == nil {
197                 log.Warn("Removing non-existent discovery topic", "topic", topic)
198                 return
199         }
200         for _, list := range s.tickets[topic].buckets {
201                 for _, ref := range list {
202                         ref.t.refCnt--
203                         if ref.t.refCnt == 0 {
204                                 delete(s.nodes, ref.t.node)
205                                 delete(s.nodeLastReq, ref.t.node)
206                         }
207                 }
208         }
209         delete(s.tickets, topic)
210 }
211
212 func (s *ticketStore) regTopicSet() []Topic {
213         topics := make([]Topic, 0, len(s.tickets))
214         for topic := range s.tickets {
215                 topics = append(topics, topic)
216         }
217         return topics
218 }
219
220 // nextRegisterLookup returns the target of the next lookup for ticket collection.
221 func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
222         // Queue up any new topics (or discarded ones), preserving iteration order
223         for topic := range s.tickets {
224                 if _, ok := s.regSet[topic]; !ok {
225                         s.regQueue = append(s.regQueue, topic)
226                         s.regSet[topic] = struct{}{}
227                 }
228         }
229         // Iterate over the set of all topics and look up the next suitable one
230         for len(s.regQueue) > 0 {
231                 // Fetch the next topic from the queue, and ensure it still exists
232                 topic := s.regQueue[0]
233                 s.regQueue = s.regQueue[1:]
234                 delete(s.regSet, topic)
235
236                 if s.tickets[topic] == nil {
237                         continue
238                 }
239                 // If the topic needs more tickets, return it
240                 if s.tickets[topic].nextLookup < Now() {
241                         next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
242                         log.Debug("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
243                         return next, delay
244                 }
245         }
246         // No registration topics found or all exhausted, sleep
247         delay := 40 * time.Second
248         log.Debug("No topic found to register", "delay", delay)
249         return lookupInfo{}, delay
250 }
251
252 func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
253         tr := s.radius[topic]
254         target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
255         if target.radiusLookup {
256                 tr.radiusLookupCnt++
257         } else {
258                 tr.radiusLookupCnt = 0
259         }
260         return target
261 }
262
263 // ticketsInWindow returns the tickets of a given topic in the registration window.
264 func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
265         // Sanity check that the topic still exists before operating on it
266         if s.tickets[topic] == nil {
267                 log.Warn("Listing non-existing discovery tickets", "topic", topic)
268                 return nil
269         }
270         // Gather all the tickers in the next time window
271         var tickets []ticketRef
272
273         buckets := s.tickets[topic].buckets
274         for idx := timeBucket(0); idx < timeWindow; idx++ {
275                 tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
276         }
277         log.Debug("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
278         return tickets
279 }
280
281 func (s *ticketStore) removeExcessTickets(t Topic) {
282         tickets := s.ticketsInWindow(t)
283         if len(tickets) <= wantTicketsInWindow {
284                 return
285         }
286         sort.Sort(ticketRefByWaitTime(tickets))
287         for _, r := range tickets[wantTicketsInWindow:] {
288                 s.removeTicketRef(r)
289         }
290 }
291
292 type ticketRefByWaitTime []ticketRef
293
294 // Len is the number of elements in the collection.
295 func (s ticketRefByWaitTime) Len() int {
296         return len(s)
297 }
298
299 func (r ticketRef) waitTime() AbsTime {
300         return r.t.regTime[r.idx] - r.t.issueTime
301 }
302
303 // Less reports whether the element with
304 // index i should sort before the element with index j.
305 func (s ticketRefByWaitTime) Less(i, j int) bool {
306         return s[i].waitTime() < s[j].waitTime()
307 }
308
309 // Swap swaps the elements with indexes i and j.
310 func (s ticketRefByWaitTime) Swap(i, j int) {
311         s[i], s[j] = s[j], s[i]
312 }
313
314 func (s *ticketStore) addTicketRef(r ticketRef) {
315         topic := r.t.topics[r.idx]
316         tickets := s.tickets[topic]
317         if tickets == nil {
318                 log.Warn("Adding ticket to non-existent topic", "topic", topic)
319                 return
320         }
321         bucket := timeBucket(r.t.regTime[r.idx] / AbsTime(ticketTimeBucketLen))
322         tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
323         r.t.refCnt++
324
325         min := Now() - AbsTime(collectFrequency)*maxCollectDebt
326         if tickets.nextLookup < min {
327                 tickets.nextLookup = min
328         }
329         tickets.nextLookup += AbsTime(collectFrequency)
330
331         //s.removeExcessTickets(topic)
332 }
333
334 func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
335         now := Now()
336         for {
337                 ticket, wait := s.nextRegisterableTicket()
338                 if ticket == nil {
339                         return ticket, wait
340                 }
341
342                 regTime := now + AbsTime(wait)
343                 topic := ticket.t.topics[ticket.idx]
344                 if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
345                         return ticket, wait
346                 }
347                 s.removeTicketRef(*ticket)
348         }
349 }
350
351 func (s *ticketStore) ticketRegistered(ref ticketRef) {
352         now := Now()
353
354         topic := ref.t.topics[ref.idx]
355         tickets := s.tickets[topic]
356         min := now - AbsTime(registerFrequency)*maxRegisterDebt
357         if min > tickets.nextReg {
358                 tickets.nextReg = min
359         }
360         tickets.nextReg += AbsTime(registerFrequency)
361         s.tickets[topic] = tickets
362
363         s.removeTicketRef(ref)
364 }
365
366 // nextRegisterableTicket returns the next ticket that can be used
367 // to register.
368 //
369 // If the returned wait time <= zero the ticket can be used. For a positive
370 // wait time, the caller should requery the next ticket later.
371 //
372 // A ticket can be returned more than once with <= zero wait time in case
373 // the ticket contains multiple topics.
374 func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
375         now := Now()
376         if s.nextTicketCached != nil {
377                 return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
378         }
379
380         for bucket := s.lastBucketFetched; ; bucket++ {
381                 var (
382                         empty      = true    // true if there are no tickets
383                         nextTicket ticketRef // uninitialized if this bucket is empty
384                 )
385                 for _, tickets := range s.tickets {
386                         //s.removeExcessTickets(topic)
387                         if len(tickets.buckets) != 0 {
388                                 empty = false
389
390                                 list := tickets.buckets[bucket]
391                                 for _, ref := range list {
392                                         //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
393                                         if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
394                                                 nextTicket = ref
395                                         }
396                                 }
397                         }
398                 }
399                 if empty {
400                         return nil, 0
401                 }
402                 if nextTicket.t != nil {
403                         s.nextTicketCached = &nextTicket
404                         return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
405                 }
406                 s.lastBucketFetched = bucket
407         }
408 }
409
410 // removeTicket removes a ticket from the ticket store
411 func (s *ticketStore) removeTicketRef(ref ticketRef) {
412         log.Debug("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
413
414         // Make nextRegisterableTicket return the next available ticket.
415         s.nextTicketCached = nil
416
417         topic := ref.topic()
418         tickets := s.tickets[topic]
419
420         if tickets == nil {
421                 log.Debug("Removing tickets from unknown topic", "topic", topic)
422                 return
423         }
424         bucket := timeBucket(ref.t.regTime[ref.idx] / AbsTime(ticketTimeBucketLen))
425         list := tickets.buckets[bucket]
426         idx := -1
427         for i, bt := range list {
428                 if bt.t == ref.t {
429                         idx = i
430                         break
431                 }
432         }
433         if idx == -1 {
434                 panic(nil)
435         }
436         list = append(list[:idx], list[idx+1:]...)
437         if len(list) != 0 {
438                 tickets.buckets[bucket] = list
439         } else {
440                 delete(tickets.buckets, bucket)
441         }
442         ref.t.refCnt--
443         if ref.t.refCnt == 0 {
444                 delete(s.nodes, ref.t.node)
445                 delete(s.nodeLastReq, ref.t.node)
446         }
447 }
448
449 type lookupInfo struct {
450         target       common.Hash
451         topic        Topic
452         radiusLookup bool
453 }
454
455 type reqInfo struct {
456         pingHash []byte
457         lookup   lookupInfo
458         time     AbsTime
459 }
460
461 // returns -1 if not found
462 func (t *ticket) findIdx(topic Topic) int {
463         for i, tt := range t.topics {
464                 if tt == topic {
465                         return i
466                 }
467         }
468         return -1
469 }
470
471 func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
472         now := Now()
473         for i, n := range nodes {
474                 if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
475                         if lookup.radiusLookup {
476                                 if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
477                                         s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
478                                 }
479                         } else {
480                                 if s.nodes[n] == nil {
481                                         s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
482                                 }
483                         }
484                 }
485         }
486 }
487
488 func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
489         now := Now()
490         for i, n := range nodes {
491                 if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
492                         if lookup.radiusLookup {
493                                 if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
494                                         s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
495                                 }
496                         } // else {
497                         if s.canQueryTopic(n, lookup.topic) {
498                                 hash := query(n, lookup.topic)
499                                 if hash != nil {
500                                         s.addTopicQuery(common.BytesToHash(hash), n, lookup)
501                                 }
502                         }
503                         //}
504                 }
505         }
506 }
507
508 func (s *ticketStore) adjustWithTicket(now AbsTime, targetHash common.Hash, t *ticket) {
509         for i, topic := range t.topics {
510                 if tt, ok := s.radius[topic]; ok {
511                         tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
512                 }
513         }
514 }
515
516 func (s *ticketStore) addTicket(localTime AbsTime, pingHash []byte, ticket *ticket) {
517         log.Debug("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
518
519         lastReq, ok := s.nodeLastReq[ticket.node]
520         if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
521                 return
522         }
523         s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
524
525         if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
526                 return
527         }
528
529         topic := lastReq.lookup.topic
530         topicIdx := ticket.findIdx(topic)
531         if topicIdx == -1 {
532                 return
533         }
534
535         bucket := timeBucket(localTime / AbsTime(ticketTimeBucketLen))
536         if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
537                 s.lastBucketFetched = bucket
538         }
539
540         if _, ok := s.tickets[topic]; ok {
541                 wait := ticket.regTime[topicIdx] - localTime
542                 rnd := rand.ExpFloat64()
543                 if rnd > 10 {
544                         rnd = 10
545                 }
546                 if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
547                         // use the ticket to register this topic
548                         //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
549                         s.addTicketRef(ticketRef{ticket, topicIdx})
550                 }
551         }
552
553         if ticket.refCnt > 0 {
554                 s.nextTicketCached = nil
555                 s.nodes[ticket.node] = ticket
556         }
557 }
558
559 func (s *ticketStore) getNodeTicket(node *Node) *ticket {
560         if s.nodes[node] == nil {
561                 log.Debug("Retrieving node ticket", "node", node.ID, "serial", nil)
562         } else {
563                 log.Debug("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
564         }
565         return s.nodes[node]
566 }
567
568 func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
569         qq := s.queriesSent[node]
570         if qq != nil {
571                 now := Now()
572                 for _, sq := range qq {
573                         if sq.lookup.topic == topic && sq.sent > now-AbsTime(topicQueryResend) {
574                                 return false
575                         }
576                 }
577         }
578         return true
579 }
580
581 func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
582         now := Now()
583         qq := s.queriesSent[node]
584         if qq == nil {
585                 qq = make(map[common.Hash]sentQuery)
586                 s.queriesSent[node] = qq
587         }
588         qq[hash] = sentQuery{sent: now, lookup: lookup}
589         s.cleanupTopicQueries(now)
590 }
591
592 func (s *ticketStore) cleanupTopicQueries(now AbsTime) {
593         if s.nextTopicQueryCleanup > now {
594                 return
595         }
596         exp := now - AbsTime(topicQueryResend)
597         for n, qq := range s.queriesSent {
598                 for h, q := range qq {
599                         if q.sent < exp {
600                                 delete(qq, h)
601                         }
602                 }
603                 if len(qq) == 0 {
604                         delete(s.queriesSent, n)
605                 }
606         }
607         s.nextTopicQueryCleanup = now + AbsTime(topicQueryTimeout)
608 }
609
610 func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
611         now := Now()
612         //fmt.Println("got", from.addr().String(), hash, len(nodes))
613         qq := s.queriesSent[from]
614         if qq == nil {
615                 return true
616         }
617         q, ok := qq[hash]
618         if !ok || now > q.sent+AbsTime(topicQueryTimeout) {
619                 return true
620         }
621         inside := float64(0)
622         if len(nodes) > 0 {
623                 inside = 1
624         }
625         s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
626         chn := s.searchTopicMap[q.lookup.topic].foundChn
627         if chn == nil {
628                 //fmt.Println("no channel")
629                 return false
630         }
631         for _, node := range nodes {
632                 ip := node.IP
633                 if ip.IsUnspecified() || ip.IsLoopback() {
634                         ip = from.IP
635                 }
636                 n := NewNode(node.ID, ip, node.UDP, node.TCP)
637                 select {
638                 case chn <- n:
639                 default:
640                         return false
641                 }
642         }
643         return false
644 }
645
646 type topicRadius struct {
647         topic             Topic
648         topicHashPrefix   uint64
649         radius, minRadius uint64
650         buckets           []topicRadiusBucket
651         converged         bool
652         radiusLookupCnt   int
653 }
654
655 type topicRadiusEvent int
656
657 const (
658         trOutside topicRadiusEvent = iota
659         trInside
660         trNoAdjust
661         trCount
662 )
663
664 type topicRadiusBucket struct {
665         weights    [trCount]float64
666         lastTime   AbsTime
667         value      float64
668         lookupSent map[common.Hash]AbsTime
669 }
670
671 func (b *topicRadiusBucket) update(now AbsTime) {
672         if now == b.lastTime {
673                 return
674         }
675         exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
676         for i, w := range b.weights {
677                 b.weights[i] = w * exp
678         }
679         b.lastTime = now
680
681         for target, tm := range b.lookupSent {
682                 if now-tm > AbsTime(respTimeout) {
683                         b.weights[trNoAdjust] += 1
684                         delete(b.lookupSent, target)
685                 }
686         }
687 }
688
689 func (b *topicRadiusBucket) adjust(now AbsTime, inside float64) {
690         b.update(now)
691         if inside <= 0 {
692                 b.weights[trOutside] += 1
693         } else {
694                 if inside >= 1 {
695                         b.weights[trInside] += 1
696                 } else {
697                         b.weights[trInside] += inside
698                         b.weights[trOutside] += 1 - inside
699                 }
700         }
701 }
702
703 func newTopicRadius(t Topic) *topicRadius {
704         topicHash := crypto.Sha256Hash([]byte(t))
705         topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
706
707         return &topicRadius{
708                 topic:           t,
709                 topicHashPrefix: topicHashPrefix,
710                 radius:          maxRadius,
711                 minRadius:       maxRadius,
712         }
713 }
714
715 func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
716         prefix := binary.BigEndian.Uint64(addrHash[0:8])
717         var log2 float64
718         if prefix != r.topicHashPrefix {
719                 log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
720         }
721         bucket := int((64 - log2) * radiusBucketsPerBit)
722         max := 64*radiusBucketsPerBit - 1
723         if bucket > max {
724                 return max
725         }
726         if bucket < 0 {
727                 return 0
728         }
729         return bucket
730 }
731
732 func (r *topicRadius) targetForBucket(bucket int) common.Hash {
733         min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
734         max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
735         a := uint64(min)
736         b := randUint64n(uint64(max - min))
737         xor := a + b
738         if xor < a {
739                 xor = ^uint64(0)
740         }
741         prefix := r.topicHashPrefix ^ xor
742         var target common.Hash
743         binary.BigEndian.PutUint64(target[0:8], prefix)
744         globalRandRead(target[8:])
745         return target
746 }
747
748 // package rand provides a Read function in Go 1.6 and later, but
749 // we can't use it yet because we still support Go 1.5.
750 func globalRandRead(b []byte) {
751         pos := 0
752         val := 0
753         for n := 0; n < len(b); n++ {
754                 if pos == 0 {
755                         val = rand.Int()
756                         pos = 7
757                 }
758                 b[n] = byte(val)
759                 val >>= 8
760                 pos--
761         }
762 }
763
764 func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
765         nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
766         dist := nodePrefix ^ r.topicHashPrefix
767         return dist < r.radius
768 }
769
770 func (r *topicRadius) chooseLookupBucket(a, b int) int {
771         if a < 0 {
772                 a = 0
773         }
774         if a > b {
775                 return -1
776         }
777         c := 0
778         for i := a; i <= b; i++ {
779                 if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
780                         c++
781                 }
782         }
783         if c == 0 {
784                 return -1
785         }
786         rnd := randUint(uint32(c))
787         for i := a; i <= b; i++ {
788                 if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
789                         if rnd == 0 {
790                                 return i
791                         }
792                         rnd--
793                 }
794         }
795         panic(nil) // should never happen
796 }
797
798 func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
799         var max float64
800         if a < 0 {
801                 a = 0
802         }
803         if b >= len(r.buckets) {
804                 b = len(r.buckets) - 1
805                 if r.buckets[b].value > max {
806                         max = r.buckets[b].value
807                 }
808         }
809         if b >= a {
810                 for i := a; i <= b; i++ {
811                         if r.buckets[i].value > max {
812                                 max = r.buckets[i].value
813                         }
814                 }
815         }
816         return maxValue-max < minPeakSize
817 }
818
819 func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
820         maxBucket := 0
821         maxValue := float64(0)
822         now := Now()
823         v := float64(0)
824         for i := range r.buckets {
825                 r.buckets[i].update(now)
826                 v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
827                 r.buckets[i].value = v
828                 //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
829         }
830         //fmt.Println()
831         slopeCross := -1
832         for i, b := range r.buckets {
833                 v := b.value
834                 if v < float64(i)*minSlope {
835                         slopeCross = i
836                         break
837                 }
838                 if v > maxValue {
839                         maxValue = v
840                         maxBucket = i + 1
841                 }
842         }
843
844         minRadBucket := len(r.buckets)
845         sum := float64(0)
846         for minRadBucket > 0 && sum < minRightSum {
847                 minRadBucket--
848                 b := r.buckets[minRadBucket]
849                 sum += b.weights[trInside] + b.weights[trOutside]
850         }
851         r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
852
853         lookupLeft := -1
854         if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
855                 lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
856         }
857         lookupRight := -1
858         if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
859                 for len(r.buckets) <= maxBucket+lookupWidth {
860                         r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]AbsTime)})
861                 }
862                 lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
863         }
864         if lookupLeft == -1 {
865                 radiusLookup = lookupRight
866         } else {
867                 if lookupRight == -1 {
868                         radiusLookup = lookupLeft
869                 } else {
870                         if randUint(2) == 0 {
871                                 radiusLookup = lookupLeft
872                         } else {
873                                 radiusLookup = lookupRight
874                         }
875                 }
876         }
877
878         //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
879
880         if radiusLookup == -1 {
881                 // no more radius lookups needed at the moment, return a radius
882                 r.converged = true
883                 rad := maxBucket
884                 if minRadBucket < rad {
885                         rad = minRadBucket
886                 }
887                 radius = ^uint64(0)
888                 if rad > 0 {
889                         radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
890                 }
891                 r.radius = radius
892         }
893
894         return
895 }
896
897 func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
898         if !forceRegular {
899                 _, radiusLookup := r.recalcRadius()
900                 if radiusLookup != -1 {
901                         target := r.targetForBucket(radiusLookup)
902                         r.buckets[radiusLookup].lookupSent[target] = Now()
903                         return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
904                 }
905         }
906
907         radExt := r.radius / 2
908         if radExt > maxRadius-r.radius {
909                 radExt = maxRadius - r.radius
910         }
911         rnd := randUint64n(r.radius) + randUint64n(2*radExt)
912         if rnd > radExt {
913                 rnd -= radExt
914         } else {
915                 rnd = radExt - rnd
916         }
917
918         prefix := r.topicHashPrefix ^ rnd
919         var target common.Hash
920         binary.BigEndian.PutUint64(target[0:8], prefix)
921         globalRandRead(target[8:])
922         return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
923 }
924
925 func (r *topicRadius) adjustWithTicket(now AbsTime, targetHash common.Hash, t ticketRef) {
926         wait := t.t.regTime[t.idx] - t.t.issueTime
927         inside := float64(wait)/float64(targetWaitTime) - 0.5
928         if inside > 1 {
929                 inside = 1
930         }
931         if inside < 0 {
932                 inside = 0
933         }
934         r.adjust(now, targetHash, t.t.node.sha, inside)
935 }
936
937 func (r *topicRadius) adjust(now AbsTime, targetHash, addrHash common.Hash, inside float64) {
938         bucket := r.getBucketIdx(addrHash)
939         //fmt.Println("adjust", bucket, len(r.buckets), inside)
940         if bucket >= len(r.buckets) {
941                 return
942         }
943         r.buckets[bucket].adjust(now, inside)
944         delete(r.buckets[bucket].lookupSent, targetHash)
945 }