OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / p2p / discover / topic.go
1 package discover
2
3 import (
4         "container/heap"
5         "fmt"
6         "math"
7         "math/rand"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11 )
12
13 const (
14         maxEntries         = 10000
15         maxEntriesPerTopic = 50
16
17         fallbackRegistrationExpiry = 1 * time.Hour
18 )
19
20 type Topic string
21
22 type AbsTime time.Duration // absolute monotonic time
23
24 func Now() AbsTime {
25         return AbsTime(uint64(time.Now().UnixNano()))
26 }
27
28 type topicEntry struct {
29         topic   Topic
30         fifoIdx uint64
31         node    *Node
32         expire  AbsTime
33 }
34
35 type topicInfo struct {
36         entries            map[uint64]*topicEntry
37         fifoHead, fifoTail uint64
38         rqItem             *topicRequestQueueItem
39         wcl                waitControlLoop
40 }
41
42 // removes tail element from the fifo
43 func (t *topicInfo) getFifoTail() *topicEntry {
44         for t.entries[t.fifoTail] == nil {
45                 t.fifoTail++
46         }
47         tail := t.entries[t.fifoTail]
48         t.fifoTail++
49         return tail
50 }
51
52 type nodeInfo struct {
53         entries                          map[Topic]*topicEntry
54         lastIssuedTicket, lastUsedTicket uint32
55         // you can't register a ticket newer than lastUsedTicket before noRegUntil (absolute time)
56         noRegUntil AbsTime
57 }
58
59 type topicTable struct {
60         db                    *nodeDB
61         self                  *Node
62         nodes                 map[*Node]*nodeInfo
63         topics                map[Topic]*topicInfo
64         globalEntries         uint64
65         requested             topicRequestQueue
66         requestCnt            uint64
67         lastGarbageCollection AbsTime
68 }
69
70 func newTopicTable(db *nodeDB, self *Node) *topicTable {
71         if printTestImgLogs {
72                 fmt.Printf("*N %016x\n", self.sha[:8])
73         }
74         return &topicTable{
75                 db:     db,
76                 nodes:  make(map[*Node]*nodeInfo),
77                 topics: make(map[Topic]*topicInfo),
78                 self:   self,
79         }
80 }
81
82 func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
83         ti := t.topics[topic]
84         if ti == nil {
85                 rqItem := &topicRequestQueueItem{
86                         topic:    topic,
87                         priority: t.requestCnt,
88                 }
89                 ti = &topicInfo{
90                         entries: make(map[uint64]*topicEntry),
91                         rqItem:  rqItem,
92                 }
93                 t.topics[topic] = ti
94                 heap.Push(&t.requested, rqItem)
95         }
96         return ti
97 }
98
99 func (t *topicTable) checkDeleteTopic(topic Topic) {
100         ti := t.topics[topic]
101         if ti == nil {
102                 return
103         }
104         if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
105                 delete(t.topics, topic)
106                 heap.Remove(&t.requested, ti.rqItem.index)
107         }
108 }
109
110 func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
111         n := t.nodes[node]
112         if n == nil {
113                 //fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
114                 var issued, used uint32
115                 if t.db != nil {
116                         issued, used = t.db.fetchTopicRegTickets(node.ID)
117                 }
118                 n = &nodeInfo{
119                         entries:          make(map[Topic]*topicEntry),
120                         lastIssuedTicket: issued,
121                         lastUsedTicket:   used,
122                 }
123                 t.nodes[node] = n
124         }
125         return n
126 }
127
128 func (t *topicTable) checkDeleteNode(node *Node) {
129         if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < Now() {
130                 //fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
131                 delete(t.nodes, node)
132         }
133 }
134
135 func (t *topicTable) storeTicketCounters(node *Node) {
136         n := t.getOrNewNode(node)
137         if t.db != nil {
138                 t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
139         }
140 }
141
142 func (t *topicTable) getEntries(topic Topic) []*Node {
143         t.collectGarbage()
144
145         te := t.topics[topic]
146         if te == nil {
147                 return nil
148         }
149         nodes := make([]*Node, len(te.entries))
150         i := 0
151         for _, e := range te.entries {
152                 nodes[i] = e.node
153                 i++
154         }
155         t.requestCnt++
156         t.requested.update(te.rqItem, t.requestCnt)
157         return nodes
158 }
159
160 func (t *topicTable) addEntry(node *Node, topic Topic) {
161         n := t.getOrNewNode(node)
162         // clear previous entries by the same node
163         for _, e := range n.entries {
164                 t.deleteEntry(e)
165         }
166         // ***
167         n = t.getOrNewNode(node)
168
169         tm := Now()
170         te := t.getOrNewTopic(topic)
171
172         if len(te.entries) == maxEntriesPerTopic {
173                 t.deleteEntry(te.getFifoTail())
174         }
175
176         if t.globalEntries == maxEntries {
177                 t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
178         }
179
180         fifoIdx := te.fifoHead
181         te.fifoHead++
182         entry := &topicEntry{
183                 topic:   topic,
184                 fifoIdx: fifoIdx,
185                 node:    node,
186                 expire:  tm + AbsTime(fallbackRegistrationExpiry),
187         }
188         if printTestImgLogs {
189                 fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
190         }
191         te.entries[fifoIdx] = entry
192         n.entries[topic] = entry
193         t.globalEntries++
194         te.wcl.registered(tm)
195 }
196
197 // removes least requested element from the fifo
198 func (t *topicTable) leastRequested() *topicEntry {
199         for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
200                 heap.Pop(&t.requested)
201         }
202         if t.requested.Len() == 0 {
203                 return nil
204         }
205         return t.topics[t.requested[0].topic].getFifoTail()
206 }
207
208 // entry should exist
209 func (t *topicTable) deleteEntry(e *topicEntry) {
210         if printTestImgLogs {
211                 fmt.Printf("*- %d %v %016x %016x\n", Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
212         }
213         ne := t.nodes[e.node].entries
214         delete(ne, e.topic)
215         if len(ne) == 0 {
216                 t.checkDeleteNode(e.node)
217         }
218         te := t.topics[e.topic]
219         delete(te.entries, e.fifoIdx)
220         if len(te.entries) == 0 {
221                 t.checkDeleteTopic(e.topic)
222         }
223         t.globalEntries--
224 }
225
226 // It is assumed that topics and waitPeriods have the same length.
227 func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
228         log.Debug("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
229         //fmt.Println("useTicket", serialNo, topics, waitPeriods)
230         t.collectGarbage()
231
232         n := t.getOrNewNode(node)
233         if serialNo < n.lastUsedTicket {
234                 return false
235         }
236
237         tm := Now()
238         if serialNo > n.lastUsedTicket && tm < n.noRegUntil {
239                 return false
240         }
241         if serialNo != n.lastUsedTicket {
242                 n.lastUsedTicket = serialNo
243                 n.noRegUntil = tm + AbsTime(noRegTimeout())
244                 t.storeTicketCounters(node)
245         }
246
247         currTime := uint64(tm / AbsTime(time.Second))
248         regTime := issueTime + uint64(waitPeriods[idx])
249         relTime := int64(currTime - regTime)
250         if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
251                 if e := n.entries[topics[idx]]; e == nil {
252                         t.addEntry(node, topics[idx])
253                 } else {
254                         // if there is an active entry, don't move to the front of the FIFO but prolong expire time
255                         e.expire = tm + AbsTime(fallbackRegistrationExpiry)
256                 }
257                 return true
258         }
259
260         return false
261 }
262
263 func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
264         topictab.collectGarbage()
265
266         now := Now()
267         n := topictab.getOrNewNode(node)
268         n.lastIssuedTicket++
269         topictab.storeTicketCounters(node)
270
271         t := &ticket{
272                 issueTime: now,
273                 topics:    topics,
274                 serial:    n.lastIssuedTicket,
275                 regTime:   make([]AbsTime, len(topics)),
276         }
277         for i, topic := range topics {
278                 var waitPeriod time.Duration
279                 if topic := topictab.topics[topic]; topic != nil {
280                         waitPeriod = topic.wcl.waitPeriod
281                 } else {
282                         waitPeriod = minWaitPeriod
283                 }
284
285                 t.regTime[i] = now + AbsTime(waitPeriod)
286         }
287         return t
288 }
289
290 const gcInterval = time.Minute
291
292 func (t *topicTable) collectGarbage() {
293         tm := Now()
294         if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
295                 return
296         }
297         t.lastGarbageCollection = tm
298
299         for node, n := range t.nodes {
300                 for _, e := range n.entries {
301                         if e.expire <= tm {
302                                 t.deleteEntry(e)
303                         }
304                 }
305
306                 t.checkDeleteNode(node)
307         }
308
309         for topic := range t.topics {
310                 t.checkDeleteTopic(topic)
311         }
312 }
313
314 const (
315         minWaitPeriod   = time.Minute
316         regTimeWindow   = 10 // seconds
317         avgnoRegTimeout = time.Minute * 10
318         // target average interval between two incoming ad requests
319         wcTargetRegInterval = time.Minute * 10 / maxEntriesPerTopic
320         //
321         wcTimeConst = time.Minute * 10
322 )
323
324 // initialization is not required, will set to minWaitPeriod at first registration
325 type waitControlLoop struct {
326         lastIncoming AbsTime
327         waitPeriod   time.Duration
328 }
329
330 func (w *waitControlLoop) registered(tm AbsTime) {
331         w.waitPeriod = w.nextWaitPeriod(tm)
332         w.lastIncoming = tm
333 }
334
335 func (w *waitControlLoop) nextWaitPeriod(tm AbsTime) time.Duration {
336         period := tm - w.lastIncoming
337         wp := time.Duration(float64(w.waitPeriod) * math.Exp((float64(wcTargetRegInterval)-float64(period))/float64(wcTimeConst)))
338         if wp < minWaitPeriod {
339                 wp = minWaitPeriod
340         }
341         return wp
342 }
343
344 func (w *waitControlLoop) hasMinimumWaitPeriod() bool {
345         return w.nextWaitPeriod(Now()) == minWaitPeriod
346 }
347
348 func noRegTimeout() time.Duration {
349         e := rand.ExpFloat64()
350         if e > 100 {
351                 e = 100
352         }
353         return time.Duration(float64(avgnoRegTimeout) * e)
354 }
355
356 type topicRequestQueueItem struct {
357         topic    Topic
358         priority uint64
359         index    int
360 }
361
362 // A topicRequestQueue implements heap.Interface and holds topicRequestQueueItems.
363 type topicRequestQueue []*topicRequestQueueItem
364
365 func (tq topicRequestQueue) Len() int { return len(tq) }
366
367 func (tq topicRequestQueue) Less(i, j int) bool {
368         return tq[i].priority < tq[j].priority
369 }
370
371 func (tq topicRequestQueue) Swap(i, j int) {
372         tq[i], tq[j] = tq[j], tq[i]
373         tq[i].index = i
374         tq[j].index = j
375 }
376
377 func (tq *topicRequestQueue) Push(x interface{}) {
378         n := len(*tq)
379         item := x.(*topicRequestQueueItem)
380         item.index = n
381         *tq = append(*tq, item)
382 }
383
384 func (tq *topicRequestQueue) Pop() interface{} {
385         old := *tq
386         n := len(old)
387         item := old[n-1]
388         item.index = -1
389         *tq = old[0 : n-1]
390         return item
391 }
392
393 func (tq *topicRequestQueue) update(item *topicRequestQueueItem, priority uint64) {
394         item.priority = priority
395         heap.Fix(tq, item.index)
396 }