OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / p2p / discover / dht / ticket.go
diff --git a/p2p/discover/dht/ticket.go b/p2p/discover/dht/ticket.go
new file mode 100644 (file)
index 0000000..a3046cc
--- /dev/null
@@ -0,0 +1,945 @@
+package dht
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "math"
+       "math/rand"
+       "sort"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/common"
+       "github.com/vapor/crypto"
+)
+
+const (
+       ticketTimeBucketLen = time.Minute
+       timeWindow          = 10 // * ticketTimeBucketLen
+       wantTicketsInWindow = 10
+       collectFrequency    = time.Second * 30
+       registerFrequency   = time.Second * 60
+       maxCollectDebt      = 10
+       maxRegisterDebt     = 5
+       keepTicketConst     = time.Minute * 10
+       keepTicketExp       = time.Minute * 5
+       targetWaitTime      = time.Minute * 10
+       topicQueryTimeout   = time.Second * 5
+       topicQueryResend    = time.Minute
+       // topic radius detection
+       maxRadius           = 0xffffffffffffffff
+       radiusTC            = time.Minute * 20
+       radiusBucketsPerBit = 8
+       minSlope            = 1
+       minPeakSize         = 40
+       maxNoAdjust         = 20
+       lookupWidth         = 8
+       minRightSum         = 20
+       searchForceQuery    = 4
+)
+
+// timeBucket represents absolute monotonic time in minutes.
+// It is used as the index into the per-topic ticket buckets.
+type timeBucket int
+
+type ticket struct {
+       topics  []Topic
+       regTime []AbsTime // Per-topic local absolute time when the ticket can be used.
+
+       // The serial number that was issued by the server.
+       serial uint32
+       // Used by registrar, tracks absolute time when the ticket was created.
+       issueTime AbsTime
+
+       // Fields used only by registrants
+       node   *Node  // the registrar node that signed this ticket
+       refCnt int    // tracks number of topics that will be registered using this ticket
+       pong   []byte // encoded pong packet signed by the registrar
+}
+
+// ticketRef refers to a single topic in a ticket.
+type ticketRef struct {
+       t   *ticket
+       idx int // index of the topic in t.topics and t.regTime
+}
+
+func (ref ticketRef) topic() Topic {
+       return ref.t.topics[ref.idx]
+}
+
+func (ref ticketRef) topicRegTime() AbsTime {
+       return ref.t.regTime[ref.idx]
+}
+
+func pongToTicket(localTime AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
+       wps := p.data.(*pong).WaitPeriods
+       if len(topics) != len(wps) {
+               return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
+       }
+       hash, _, err := wireHash(topics)
+       if err != nil {
+               return nil, err
+       }
+       if hash != p.data.(*pong).TopicHash {
+               return nil, fmt.Errorf("bad topic hash")
+       }
+       t := &ticket{
+               issueTime: localTime,
+               node:      node,
+               topics:    topics,
+               pong:      p.rawData,
+               regTime:   make([]AbsTime, len(wps)),
+       }
+       // Convert wait periods to local absolute time.
+       for i, wp := range wps {
+               t.regTime[i] = localTime + AbsTime(time.Second*time.Duration(wp))
+       }
+       return t, nil
+}
+
+func ticketToPong(t *ticket, pong *pong) {
+       var err error
+       pong.Expiration = uint64(t.issueTime / AbsTime(time.Second))
+       pong.TopicHash, _, err = wireHash(t.topics)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("wireHash err")
+       }
+       pong.TicketSerial = t.serial
+       pong.WaitPeriods = make([]uint32, len(t.regTime))
+       for i, regTime := range t.regTime {
+               pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
+       }
+}
+
+type ticketStore struct {
+       // radius detector and target address generator
+       // exists for both searched and registered topics
+       radius map[Topic]*topicRadius
+
+       // Contains buckets (for each absolute minute) of tickets
+       // that can be used in that minute.
+       // This is only set if the topic is being registered.
+       tickets map[Topic]*topicTickets
+
+       regQueue []Topic            // Topic registration queue for round robin attempts
+       regSet   map[Topic]struct{} // Topic registration queue contents for fast filling
+
+       nodes       map[*Node]*ticket
+       nodeLastReq map[*Node]reqInfo
+
+       lastBucketFetched timeBucket
+       nextTicketCached  *ticketRef
+       nextTicketReg     AbsTime
+
+       searchTopicMap        map[Topic]searchTopic
+       nextTopicQueryCleanup AbsTime
+       queriesSent           map[*Node]map[common.Hash]sentQuery
+}
+
+type searchTopic struct {
+       foundChn chan<- *Node
+}
+
+type sentQuery struct {
+       sent   AbsTime
+       lookup lookupInfo
+}
+
+type topicTickets struct {
+       buckets    map[timeBucket][]ticketRef
+       nextLookup AbsTime
+       nextReg    AbsTime
+}
+
+func newTicketStore() *ticketStore {
+       return &ticketStore{
+               radius:         make(map[Topic]*topicRadius),
+               tickets:        make(map[Topic]*topicTickets),
+               regSet:         make(map[Topic]struct{}),
+               nodes:          make(map[*Node]*ticket),
+               nodeLastReq:    make(map[*Node]reqInfo),
+               searchTopicMap: make(map[Topic]searchTopic),
+               queriesSent:    make(map[*Node]map[common.Hash]sentQuery),
+       }
+}
+
+// addTopic starts tracking a topic. If register is true,
+// the local node will register the topic and tickets will be collected.
+func (s *ticketStore) addTopic(topic Topic, register bool) {
+       log.WithFields(log.Fields{"module": logModule, "topic": topic, "register": register}).Debug("Adding discovery topic")
+       if s.radius[topic] == nil {
+               s.radius[topic] = newTopicRadius(topic)
+       }
+       if register && s.tickets[topic] == nil {
+               s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
+       }
+}
+
+func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
+       s.addTopic(t, false)
+       if s.searchTopicMap[t].foundChn == nil {
+               s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
+       }
+}
+
+func (s *ticketStore) removeSearchTopic(t Topic) {
+       if st := s.searchTopicMap[t]; st.foundChn != nil {
+               delete(s.searchTopicMap, t)
+       }
+}
+
+// removeRegisterTopic deletes all tickets for the given topic.
+func (s *ticketStore) removeRegisterTopic(topic Topic) {
+       log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("Removing discovery topic")
+       if s.tickets[topic] == nil {
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Removing non-existent discovery topic")
+               return
+       }
+       for _, list := range s.tickets[topic].buckets {
+               for _, ref := range list {
+                       ref.t.refCnt--
+                       if ref.t.refCnt == 0 {
+                               delete(s.nodes, ref.t.node)
+                               delete(s.nodeLastReq, ref.t.node)
+                       }
+               }
+       }
+       delete(s.tickets, topic)
+}
+
+func (s *ticketStore) regTopicSet() []Topic {
+       topics := make([]Topic, 0, len(s.tickets))
+       for topic := range s.tickets {
+               topics = append(topics, topic)
+       }
+       return topics
+}
+
+// nextRegisterLookup returns the target of the next lookup for ticket collection.
+func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
+       // Queue up any new topics (or discarded ones), preserving iteration order
+       for topic := range s.tickets {
+               if _, ok := s.regSet[topic]; !ok {
+                       s.regQueue = append(s.regQueue, topic)
+                       s.regSet[topic] = struct{}{}
+               }
+       }
+       // Iterate over the set of all topics and look up the next suitable one
+       for len(s.regQueue) > 0 {
+               // Fetch the next topic from the queue, and ensure it still exists
+               topic := s.regQueue[0]
+               s.regQueue = s.regQueue[1:]
+               delete(s.regSet, topic)
+
+               if s.tickets[topic] == nil {
+                       continue
+               }
+               // If the topic needs more tickets, return it
+               if s.tickets[topic].nextLookup < Now() {
+                       next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
+                       log.WithFields(log.Fields{"module": logModule, "topic": topic, "target": next.target, "delay": delay}).Debug("Found discovery topic to register")
+                       return next, delay
+               }
+       }
+       // No registration topics found or all exhausted, sleep
+       delay := 40 * time.Second
+       log.WithFields(log.Fields{"module": logModule, "delay": delay}).Debug("No topic found to register")
+       return lookupInfo{}, delay
+}
+
+func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
+       tr := s.radius[topic]
+       target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
+       if target.radiusLookup {
+               tr.radiusLookupCnt++
+       } else {
+               tr.radiusLookupCnt = 0
+       }
+       return target
+}
+
+// ticketsInWindow returns the tickets of a given topic in the registration window.
+func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
+       // Sanity check that the topic still exists before operating on it
+       if s.tickets[topic] == nil {
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Listing non-existing discovery tickets")
+               return nil
+       }
+       // Gather all the tickers in the next time window
+       var tickets []ticketRef
+
+       buckets := s.tickets[topic].buckets
+       for idx := timeBucket(0); idx < timeWindow; idx++ {
+               tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
+       }
+       log.WithFields(log.Fields{"module": logModule, "topic": topic, "from": s.lastBucketFetched, "tickets": len(tickets)}).Debug("Retrieved discovery registration tickets")
+       return tickets
+}
+
+func (s *ticketStore) removeExcessTickets(t Topic) {
+       tickets := s.ticketsInWindow(t)
+       if len(tickets) <= wantTicketsInWindow {
+               return
+       }
+       sort.Sort(ticketRefByWaitTime(tickets))
+       for _, r := range tickets[wantTicketsInWindow:] {
+               s.removeTicketRef(r)
+       }
+}
+
+type ticketRefByWaitTime []ticketRef
+
+// Len is the number of elements in the collection.
+func (s ticketRefByWaitTime) Len() int {
+       return len(s)
+}
+
+func (r ticketRef) waitTime() AbsTime {
+       return r.t.regTime[r.idx] - r.t.issueTime
+}
+
+// Less reports whether the element with
+// index i should sort before the element with index j.
+func (s ticketRefByWaitTime) Less(i, j int) bool {
+       return s[i].waitTime() < s[j].waitTime()
+}
+
+// Swap swaps the elements with indexes i and j.
+func (s ticketRefByWaitTime) Swap(i, j int) {
+       s[i], s[j] = s[j], s[i]
+}
+
+func (s *ticketStore) addTicketRef(r ticketRef) {
+       topic := r.t.topics[r.idx]
+       tickets := s.tickets[topic]
+       if tickets == nil {
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Warn("Adding ticket to non-existent topic")
+               return
+       }
+       bucket := timeBucket(r.t.regTime[r.idx] / AbsTime(ticketTimeBucketLen))
+       tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
+       r.t.refCnt++
+
+       min := Now() - AbsTime(collectFrequency)*maxCollectDebt
+       if tickets.nextLookup < min {
+               tickets.nextLookup = min
+       }
+       tickets.nextLookup += AbsTime(collectFrequency)
+
+       //s.removeExcessTickets(topic)
+}
+
+func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
+       now := Now()
+       for {
+               ticket, wait := s.nextRegisterableTicket()
+               if ticket == nil {
+                       return ticket, wait
+               }
+
+               regTime := now + AbsTime(wait)
+               topic := ticket.t.topics[ticket.idx]
+               if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
+                       return ticket, wait
+               }
+               s.removeTicketRef(*ticket)
+       }
+}
+
+func (s *ticketStore) ticketRegistered(ref ticketRef) {
+       now := Now()
+
+       topic := ref.t.topics[ref.idx]
+       tickets := s.tickets[topic]
+       min := now - AbsTime(registerFrequency)*maxRegisterDebt
+       if min > tickets.nextReg {
+               tickets.nextReg = min
+       }
+       tickets.nextReg += AbsTime(registerFrequency)
+       s.tickets[topic] = tickets
+
+       s.removeTicketRef(ref)
+}
+
+// nextRegisterableTicket returns the next ticket that can be used
+// to register.
+//
+// If the returned wait time <= zero the ticket can be used. For a positive
+// wait time, the caller should requery the next ticket later.
+//
+// A ticket can be returned more than once with <= zero wait time in case
+// the ticket contains multiple topics.
+func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
+       now := Now()
+       if s.nextTicketCached != nil {
+               return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
+       }
+
+       for bucket := s.lastBucketFetched; ; bucket++ {
+               var (
+                       empty      = true    // true if there are no tickets
+                       nextTicket ticketRef // uninitialized if this bucket is empty
+               )
+               for _, tickets := range s.tickets {
+                       //s.removeExcessTickets(topic)
+                       if len(tickets.buckets) != 0 {
+                               empty = false
+
+                               list := tickets.buckets[bucket]
+                               for _, ref := range list {
+                                       //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
+                                       if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
+                                               nextTicket = ref
+                                       }
+                               }
+                       }
+               }
+               if empty {
+                       return nil, 0
+               }
+               if nextTicket.t != nil {
+                       s.nextTicketCached = &nextTicket
+                       return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
+               }
+               s.lastBucketFetched = bucket
+       }
+}
+
+// removeTicket removes a ticket from the ticket store
+func (s *ticketStore) removeTicketRef(ref ticketRef) {
+       log.WithFields(log.Fields{"module": logModule, "node": ref.t.node.ID, "serial": ref.t.serial}).Debug("Removing discovery ticket reference")
+
+       // Make nextRegisterableTicket return the next available ticket.
+       s.nextTicketCached = nil
+
+       topic := ref.topic()
+       tickets := s.tickets[topic]
+
+       if tickets == nil {
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("Removing tickets from unknown topic")
+               return
+       }
+       bucket := timeBucket(ref.t.regTime[ref.idx] / AbsTime(ticketTimeBucketLen))
+       list := tickets.buckets[bucket]
+       idx := -1
+       for i, bt := range list {
+               if bt.t == ref.t {
+                       idx = i
+                       break
+               }
+       }
+       if idx == -1 {
+               panic(nil)
+       }
+       list = append(list[:idx], list[idx+1:]...)
+       if len(list) != 0 {
+               tickets.buckets[bucket] = list
+       } else {
+               delete(tickets.buckets, bucket)
+       }
+       ref.t.refCnt--
+       if ref.t.refCnt == 0 {
+               delete(s.nodes, ref.t.node)
+               delete(s.nodeLastReq, ref.t.node)
+       }
+}
+
+type lookupInfo struct {
+       target       common.Hash
+       topic        Topic
+       radiusLookup bool
+}
+
+type reqInfo struct {
+       pingHash []byte
+       lookup   lookupInfo
+       time     AbsTime
+}
+
+// returns -1 if not found
+func (t *ticket) findIdx(topic Topic) int {
+       for i, tt := range t.topics {
+               if tt == topic {
+                       return i
+               }
+       }
+       return -1
+}
+
+func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
+       now := Now()
+       for i, n := range nodes {
+               if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
+                       if lookup.radiusLookup {
+                               if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
+                                       s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
+                               }
+                       } else {
+                               if s.nodes[n] == nil {
+                                       s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
+                               }
+                       }
+               }
+       }
+}
+
+func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
+       now := Now()
+       for i, n := range nodes {
+               if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
+                       if lookup.radiusLookup {
+                               if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
+                                       s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
+                               }
+                       } // else {
+                       if s.canQueryTopic(n, lookup.topic) {
+                               hash := query(n, lookup.topic)
+                               if hash != nil {
+                                       s.addTopicQuery(common.BytesToHash(hash), n, lookup)
+                               }
+                       }
+                       //}
+               }
+       }
+}
+
+func (s *ticketStore) adjustWithTicket(now AbsTime, targetHash common.Hash, t *ticket) {
+       for i, topic := range t.topics {
+               if tt, ok := s.radius[topic]; ok {
+                       tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
+               }
+       }
+}
+
+func (s *ticketStore) addTicket(localTime AbsTime, pingHash []byte, ticket *ticket) {
+       log.WithFields(log.Fields{"module": logModule, "node": ticket.node.ID, "serial": ticket.serial}).Debug("Adding discovery ticket")
+
+       lastReq, ok := s.nodeLastReq[ticket.node]
+       if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
+               return
+       }
+       s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
+
+       if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
+               return
+       }
+
+       topic := lastReq.lookup.topic
+       topicIdx := ticket.findIdx(topic)
+       if topicIdx == -1 {
+               return
+       }
+
+       bucket := timeBucket(localTime / AbsTime(ticketTimeBucketLen))
+       if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
+               s.lastBucketFetched = bucket
+       }
+
+       if _, ok := s.tickets[topic]; ok {
+               wait := ticket.regTime[topicIdx] - localTime
+               rnd := rand.New(rand.NewSource(time.Now().UnixNano())).ExpFloat64()
+               if rnd > 10 {
+                       rnd = 10
+               }
+               if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
+                       // use the ticket to register this topic
+                       //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
+                       s.addTicketRef(ticketRef{ticket, topicIdx})
+               }
+       }
+
+       if ticket.refCnt > 0 {
+               s.nextTicketCached = nil
+               s.nodes[ticket.node] = ticket
+       }
+}
+
+func (s *ticketStore) getNodeTicket(node *Node) *ticket {
+       if s.nodes[node] == nil {
+               log.WithFields(log.Fields{"module": logModule, "node": node.ID, "serial": nil}).Debug("Retrieving node ticket")
+       } else {
+               log.WithFields(log.Fields{"module": logModule, "node": node.ID, "serial": s.nodes[node].serial}).Debug("Retrieving node ticket")
+       }
+       return s.nodes[node]
+}
+
+func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
+       qq := s.queriesSent[node]
+       if qq != nil {
+               now := Now()
+               for _, sq := range qq {
+                       if sq.lookup.topic == topic && sq.sent > now-AbsTime(topicQueryResend) {
+                               return false
+                       }
+               }
+       }
+       return true
+}
+
+func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
+       now := Now()
+       qq := s.queriesSent[node]
+       if qq == nil {
+               qq = make(map[common.Hash]sentQuery)
+               s.queriesSent[node] = qq
+       }
+       qq[hash] = sentQuery{sent: now, lookup: lookup}
+       s.cleanupTopicQueries(now)
+}
+
+func (s *ticketStore) cleanupTopicQueries(now AbsTime) {
+       if s.nextTopicQueryCleanup > now {
+               return
+       }
+       exp := now - AbsTime(topicQueryResend)
+       for n, qq := range s.queriesSent {
+               for h, q := range qq {
+                       if q.sent < exp {
+                               delete(qq, h)
+                       }
+               }
+               if len(qq) == 0 {
+                       delete(s.queriesSent, n)
+               }
+       }
+       s.nextTopicQueryCleanup = now + AbsTime(topicQueryTimeout)
+}
+
+func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
+       now := Now()
+       //fmt.Println("got", from.addr().String(), hash, len(nodes))
+       qq := s.queriesSent[from]
+       if qq == nil {
+               return true
+       }
+       q, ok := qq[hash]
+       if !ok || now > q.sent+AbsTime(topicQueryTimeout) {
+               return true
+       }
+       inside := float64(0)
+       if len(nodes) > 0 {
+               inside = 1
+       }
+       s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
+       chn := s.searchTopicMap[q.lookup.topic].foundChn
+       if chn == nil {
+               //fmt.Println("no channel")
+               return false
+       }
+       for _, node := range nodes {
+               ip := node.IP
+               if ip.IsUnspecified() || ip.IsLoopback() {
+                       ip = from.IP
+               }
+               n := NewNode(node.ID, ip, node.UDP, node.TCP)
+               select {
+               case chn <- n:
+               default:
+                       return false
+               }
+       }
+       return false
+}
+
+type topicRadius struct {
+       topic             Topic
+       topicHashPrefix   uint64
+       radius, minRadius uint64
+       buckets           []topicRadiusBucket
+       converged         bool
+       radiusLookupCnt   int
+}
+
+type topicRadiusEvent int
+
+const (
+       trOutside topicRadiusEvent = iota
+       trInside
+       trNoAdjust
+       trCount
+)
+
+type topicRadiusBucket struct {
+       weights    [trCount]float64
+       lastTime   AbsTime
+       value      float64
+       lookupSent map[common.Hash]AbsTime
+}
+
+func (b *topicRadiusBucket) update(now AbsTime) {
+       if now == b.lastTime {
+               return
+       }
+       exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
+       for i, w := range b.weights {
+               b.weights[i] = w * exp
+       }
+       b.lastTime = now
+
+       for target, tm := range b.lookupSent {
+               if now-tm > AbsTime(respTimeout) {
+                       b.weights[trNoAdjust] += 1
+                       delete(b.lookupSent, target)
+               }
+       }
+}
+
+func (b *topicRadiusBucket) adjust(now AbsTime, inside float64) {
+       b.update(now)
+       if inside <= 0 {
+               b.weights[trOutside] += 1
+       } else {
+               if inside >= 1 {
+                       b.weights[trInside] += 1
+               } else {
+                       b.weights[trInside] += inside
+                       b.weights[trOutside] += 1 - inside
+               }
+       }
+}
+
+func newTopicRadius(t Topic) *topicRadius {
+       topicHash := crypto.Sha256Hash([]byte(t))
+       topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
+
+       return &topicRadius{
+               topic:           t,
+               topicHashPrefix: topicHashPrefix,
+               radius:          maxRadius,
+               minRadius:       maxRadius,
+       }
+}
+
+func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
+       prefix := binary.BigEndian.Uint64(addrHash[0:8])
+       var log2 float64
+       if prefix != r.topicHashPrefix {
+               log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
+       }
+       bucket := int((64 - log2) * radiusBucketsPerBit)
+       max := 64*radiusBucketsPerBit - 1
+       if bucket > max {
+               return max
+       }
+       if bucket < 0 {
+               return 0
+       }
+       return bucket
+}
+
+func (r *topicRadius) targetForBucket(bucket int) common.Hash {
+       min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
+       max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
+       a := uint64(min)
+       b := randUint64n(uint64(max - min))
+       xor := a + b
+       if xor < a {
+               xor = ^uint64(0)
+       }
+       prefix := r.topicHashPrefix ^ xor
+       var target common.Hash
+       binary.BigEndian.PutUint64(target[0:8], prefix)
+       globalRandRead(target[8:])
+       return target
+}
+
+// package rand provides a Read function in Go 1.6 and later, but
+// we can't use it yet because we still support Go 1.5.
+func globalRandRead(b []byte) {
+       pos := 0
+       val := 0
+       for n := 0; n < len(b); n++ {
+               if pos == 0 {
+                       val = rand.New(rand.NewSource(time.Now().UnixNano())).Int()
+                       pos = 7
+               }
+               b[n] = byte(val)
+               val >>= 8
+               pos--
+       }
+}
+
+func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
+       nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
+       dist := nodePrefix ^ r.topicHashPrefix
+       return dist < r.radius
+}
+
+func (r *topicRadius) chooseLookupBucket(a, b int) int {
+       if a < 0 {
+               a = 0
+       }
+       if a > b {
+               return -1
+       }
+       c := 0
+       for i := a; i <= b; i++ {
+               if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
+                       c++
+               }
+       }
+       if c == 0 {
+               return -1
+       }
+       rnd := randUint(uint32(c))
+       for i := a; i <= b; i++ {
+               if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
+                       if rnd == 0 {
+                               return i
+                       }
+                       rnd--
+               }
+       }
+       panic(nil) // should never happen
+}
+
+func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
+       var max float64
+       if a < 0 {
+               a = 0
+       }
+       if b >= len(r.buckets) {
+               b = len(r.buckets) - 1
+               if r.buckets[b].value > max {
+                       max = r.buckets[b].value
+               }
+       }
+       if b >= a {
+               for i := a; i <= b; i++ {
+                       if r.buckets[i].value > max {
+                               max = r.buckets[i].value
+                       }
+               }
+       }
+       return maxValue-max < minPeakSize
+}
+
+func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
+       maxBucket := 0
+       maxValue := float64(0)
+       now := Now()
+       v := float64(0)
+       for i := range r.buckets {
+               r.buckets[i].update(now)
+               v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
+               r.buckets[i].value = v
+               //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
+       }
+       //fmt.Println()
+       slopeCross := -1
+       for i, b := range r.buckets {
+               v := b.value
+               if v < float64(i)*minSlope {
+                       slopeCross = i
+                       break
+               }
+               if v > maxValue {
+                       maxValue = v
+                       maxBucket = i + 1
+               }
+       }
+
+       minRadBucket := len(r.buckets)
+       sum := float64(0)
+       for minRadBucket > 0 && sum < minRightSum {
+               minRadBucket--
+               b := r.buckets[minRadBucket]
+               sum += b.weights[trInside] + b.weights[trOutside]
+       }
+       r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
+
+       lookupLeft := -1
+       if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
+               lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
+       }
+       lookupRight := -1
+       if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
+               for len(r.buckets) <= maxBucket+lookupWidth {
+                       r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]AbsTime)})
+               }
+               lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
+       }
+       if lookupLeft == -1 {
+               radiusLookup = lookupRight
+       } else {
+               if lookupRight == -1 {
+                       radiusLookup = lookupLeft
+               } else {
+                       if randUint(2) == 0 {
+                               radiusLookup = lookupLeft
+                       } else {
+                               radiusLookup = lookupRight
+                       }
+               }
+       }
+
+       //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
+
+       if radiusLookup == -1 {
+               // no more radius lookups needed at the moment, return a radius
+               r.converged = true
+               rad := maxBucket
+               if minRadBucket < rad {
+                       rad = minRadBucket
+               }
+               radius = ^uint64(0)
+               if rad > 0 {
+                       radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
+               }
+               r.radius = radius
+       }
+
+       return
+}
+
+func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
+       if !forceRegular {
+               _, radiusLookup := r.recalcRadius()
+               if radiusLookup != -1 {
+                       target := r.targetForBucket(radiusLookup)
+                       r.buckets[radiusLookup].lookupSent[target] = Now()
+                       return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
+               }
+       }
+
+       radExt := r.radius / 2
+       if radExt > maxRadius-r.radius {
+               radExt = maxRadius - r.radius
+       }
+       rnd := randUint64n(r.radius) + randUint64n(2*radExt)
+       if rnd > radExt {
+               rnd -= radExt
+       } else {
+               rnd = radExt - rnd
+       }
+
+       prefix := r.topicHashPrefix ^ rnd
+       var target common.Hash
+       binary.BigEndian.PutUint64(target[0:8], prefix)
+       globalRandRead(target[8:])
+       return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
+}
+
+func (r *topicRadius) adjustWithTicket(now AbsTime, targetHash common.Hash, t ticketRef) {
+       wait := t.t.regTime[t.idx] - t.t.issueTime
+       inside := float64(wait)/float64(targetWaitTime) - 0.5
+       if inside > 1 {
+               inside = 1
+       }
+       if inside < 0 {
+               inside = 0
+       }
+       r.adjust(now, targetHash, t.t.node.sha, inside)
+}
+
+func (r *topicRadius) adjust(now AbsTime, targetHash, addrHash common.Hash, inside float64) {
+       bucket := r.getBucketIdx(addrHash)
+       //fmt.Println("adjust", bucket, len(r.buckets), inside)
+       if bucket >= len(r.buckets) {
+               return
+       }
+       r.buckets[bucket].adjust(now, inside)
+       delete(r.buckets[bucket].lookupSent, targetHash)
+}