10 log "github.com/sirupsen/logrus"
15 maxEntriesPerTopic = 50
17 fallbackRegistrationExpiry = 1 * time.Hour
22 type AbsTime time.Duration // absolute monotonic time
25 return AbsTime(uint64(time.Now().UnixNano()))
28 type topicEntry struct {
35 type topicInfo struct {
36 entries map[uint64]*topicEntry
37 fifoHead, fifoTail uint64
38 rqItem *topicRequestQueueItem
42 // removes tail element from the fifo
43 func (t *topicInfo) getFifoTail() *topicEntry {
44 for t.entries[t.fifoTail] == nil {
47 tail := t.entries[t.fifoTail]
52 type nodeInfo struct {
53 entries map[Topic]*topicEntry
54 lastIssuedTicket, lastUsedTicket uint32
55 // you can't register a ticket newer than lastUsedTicket before noRegUntil (absolute time)
59 type topicTable struct {
62 nodes map[*Node]*nodeInfo
63 topics map[Topic]*topicInfo
65 requested topicRequestQueue
67 lastGarbageCollection AbsTime
70 func newTopicTable(db *nodeDB, self *Node) *topicTable {
72 fmt.Printf("*N %016x\n", self.sha[:8])
76 nodes: make(map[*Node]*nodeInfo),
77 topics: make(map[Topic]*topicInfo),
82 func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
85 rqItem := &topicRequestQueueItem{
87 priority: t.requestCnt,
90 entries: make(map[uint64]*topicEntry),
94 heap.Push(&t.requested, rqItem)
99 func (t *topicTable) checkDeleteTopic(topic Topic) {
100 ti := t.topics[topic]
104 if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
105 delete(t.topics, topic)
106 heap.Remove(&t.requested, ti.rqItem.index)
110 func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
113 //fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
114 var issued, used uint32
116 issued, used = t.db.fetchTopicRegTickets(node.ID)
119 entries: make(map[Topic]*topicEntry),
120 lastIssuedTicket: issued,
121 lastUsedTicket: used,
128 func (t *topicTable) checkDeleteNode(node *Node) {
129 if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < Now() {
130 //fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
131 delete(t.nodes, node)
135 func (t *topicTable) storeTicketCounters(node *Node) {
136 n := t.getOrNewNode(node)
138 t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
142 func (t *topicTable) getEntries(topic Topic) []*Node {
145 te := t.topics[topic]
149 nodes := make([]*Node, len(te.entries))
151 for _, e := range te.entries {
156 t.requested.update(te.rqItem, t.requestCnt)
160 func (t *topicTable) addEntry(node *Node, topic Topic) {
161 n := t.getOrNewNode(node)
162 // clear previous entries by the same node
163 for _, e := range n.entries {
167 n = t.getOrNewNode(node)
170 te := t.getOrNewTopic(topic)
172 if len(te.entries) == maxEntriesPerTopic {
173 t.deleteEntry(te.getFifoTail())
176 if t.globalEntries == maxEntries {
177 t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
180 fifoIdx := te.fifoHead
182 entry := &topicEntry{
186 expire: tm + AbsTime(fallbackRegistrationExpiry),
188 if printTestImgLogs {
189 fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
191 te.entries[fifoIdx] = entry
192 n.entries[topic] = entry
194 te.wcl.registered(tm)
197 // removes least requested element from the fifo
198 func (t *topicTable) leastRequested() *topicEntry {
199 for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
200 heap.Pop(&t.requested)
202 if t.requested.Len() == 0 {
205 return t.topics[t.requested[0].topic].getFifoTail()
208 // entry should exist
209 func (t *topicTable) deleteEntry(e *topicEntry) {
210 if printTestImgLogs {
211 fmt.Printf("*- %d %v %016x %016x\n", Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
213 ne := t.nodes[e.node].entries
216 t.checkDeleteNode(e.node)
218 te := t.topics[e.topic]
219 delete(te.entries, e.fifoIdx)
220 if len(te.entries) == 0 {
221 t.checkDeleteTopic(e.topic)
226 // It is assumed that topics and waitPeriods have the same length.
227 func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
228 log.Debug("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
229 //fmt.Println("useTicket", serialNo, topics, waitPeriods)
232 n := t.getOrNewNode(node)
233 if serialNo < n.lastUsedTicket {
238 if serialNo > n.lastUsedTicket && tm < n.noRegUntil {
241 if serialNo != n.lastUsedTicket {
242 n.lastUsedTicket = serialNo
243 n.noRegUntil = tm + AbsTime(noRegTimeout())
244 t.storeTicketCounters(node)
247 currTime := uint64(tm / AbsTime(time.Second))
248 regTime := issueTime + uint64(waitPeriods[idx])
249 relTime := int64(currTime - regTime)
250 if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
251 if e := n.entries[topics[idx]]; e == nil {
252 t.addEntry(node, topics[idx])
254 // if there is an active entry, don't move to the front of the FIFO but prolong expire time
255 e.expire = tm + AbsTime(fallbackRegistrationExpiry)
263 func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
264 topictab.collectGarbage()
267 n := topictab.getOrNewNode(node)
269 topictab.storeTicketCounters(node)
274 serial: n.lastIssuedTicket,
275 regTime: make([]AbsTime, len(topics)),
277 for i, topic := range topics {
278 var waitPeriod time.Duration
279 if topic := topictab.topics[topic]; topic != nil {
280 waitPeriod = topic.wcl.waitPeriod
282 waitPeriod = minWaitPeriod
285 t.regTime[i] = now + AbsTime(waitPeriod)
290 const gcInterval = time.Minute
292 func (t *topicTable) collectGarbage() {
294 if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
297 t.lastGarbageCollection = tm
299 for node, n := range t.nodes {
300 for _, e := range n.entries {
306 t.checkDeleteNode(node)
309 for topic := range t.topics {
310 t.checkDeleteTopic(topic)
315 minWaitPeriod = time.Minute
316 regTimeWindow = 10 // seconds
317 avgnoRegTimeout = time.Minute * 10
318 // target average interval between two incoming ad requests
319 wcTargetRegInterval = time.Minute * 10 / maxEntriesPerTopic
321 wcTimeConst = time.Minute * 10
324 // initialization is not required, will set to minWaitPeriod at first registration
325 type waitControlLoop struct {
327 waitPeriod time.Duration
330 func (w *waitControlLoop) registered(tm AbsTime) {
331 w.waitPeriod = w.nextWaitPeriod(tm)
335 func (w *waitControlLoop) nextWaitPeriod(tm AbsTime) time.Duration {
336 period := tm - w.lastIncoming
337 wp := time.Duration(float64(w.waitPeriod) * math.Exp((float64(wcTargetRegInterval)-float64(period))/float64(wcTimeConst)))
338 if wp < minWaitPeriod {
344 func (w *waitControlLoop) hasMinimumWaitPeriod() bool {
345 return w.nextWaitPeriod(Now()) == minWaitPeriod
348 func noRegTimeout() time.Duration {
349 e := rand.ExpFloat64()
353 return time.Duration(float64(avgnoRegTimeout) * e)
356 type topicRequestQueueItem struct {
362 // A topicRequestQueue implements heap.Interface and holds topicRequestQueueItems.
363 type topicRequestQueue []*topicRequestQueueItem
365 func (tq topicRequestQueue) Len() int { return len(tq) }
367 func (tq topicRequestQueue) Less(i, j int) bool {
368 return tq[i].priority < tq[j].priority
371 func (tq topicRequestQueue) Swap(i, j int) {
372 tq[i], tq[j] = tq[j], tq[i]
377 func (tq *topicRequestQueue) Push(x interface{}) {
379 item := x.(*topicRequestQueueItem)
381 *tq = append(*tq, item)
384 func (tq *topicRequestQueue) Pop() interface{} {
393 func (tq *topicRequestQueue) update(item *topicRequestQueueItem, priority uint64) {
394 item.priority = priority
395 heap.Fix(tq, item.index)