OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / p2p / discover / topic.go
diff --git a/p2p/discover/topic.go b/p2p/discover/topic.go
deleted file mode 100644 (file)
index 3a083c3..0000000
+++ /dev/null
@@ -1,396 +0,0 @@
-package discover
-
-import (
-       "container/heap"
-       "fmt"
-       "math"
-       "math/rand"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-)
-
-const (
-       maxEntries         = 10000
-       maxEntriesPerTopic = 50
-
-       fallbackRegistrationExpiry = 1 * time.Hour
-)
-
-type Topic string
-
-type AbsTime time.Duration // absolute monotonic time
-
-func Now() AbsTime {
-       return AbsTime(uint64(time.Now().UnixNano()))
-}
-
-type topicEntry struct {
-       topic   Topic
-       fifoIdx uint64
-       node    *Node
-       expire  AbsTime
-}
-
-type topicInfo struct {
-       entries            map[uint64]*topicEntry
-       fifoHead, fifoTail uint64
-       rqItem             *topicRequestQueueItem
-       wcl                waitControlLoop
-}
-
-// removes tail element from the fifo
-func (t *topicInfo) getFifoTail() *topicEntry {
-       for t.entries[t.fifoTail] == nil {
-               t.fifoTail++
-       }
-       tail := t.entries[t.fifoTail]
-       t.fifoTail++
-       return tail
-}
-
-type nodeInfo struct {
-       entries                          map[Topic]*topicEntry
-       lastIssuedTicket, lastUsedTicket uint32
-       // you can't register a ticket newer than lastUsedTicket before noRegUntil (absolute time)
-       noRegUntil AbsTime
-}
-
-type topicTable struct {
-       db                    *nodeDB
-       self                  *Node
-       nodes                 map[*Node]*nodeInfo
-       topics                map[Topic]*topicInfo
-       globalEntries         uint64
-       requested             topicRequestQueue
-       requestCnt            uint64
-       lastGarbageCollection AbsTime
-}
-
-func newTopicTable(db *nodeDB, self *Node) *topicTable {
-       if printTestImgLogs {
-               fmt.Printf("*N %016x\n", self.sha[:8])
-       }
-       return &topicTable{
-               db:     db,
-               nodes:  make(map[*Node]*nodeInfo),
-               topics: make(map[Topic]*topicInfo),
-               self:   self,
-       }
-}
-
-func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
-       ti := t.topics[topic]
-       if ti == nil {
-               rqItem := &topicRequestQueueItem{
-                       topic:    topic,
-                       priority: t.requestCnt,
-               }
-               ti = &topicInfo{
-                       entries: make(map[uint64]*topicEntry),
-                       rqItem:  rqItem,
-               }
-               t.topics[topic] = ti
-               heap.Push(&t.requested, rqItem)
-       }
-       return ti
-}
-
-func (t *topicTable) checkDeleteTopic(topic Topic) {
-       ti := t.topics[topic]
-       if ti == nil {
-               return
-       }
-       if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
-               delete(t.topics, topic)
-               heap.Remove(&t.requested, ti.rqItem.index)
-       }
-}
-
-func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
-       n := t.nodes[node]
-       if n == nil {
-               //fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
-               var issued, used uint32
-               if t.db != nil {
-                       issued, used = t.db.fetchTopicRegTickets(node.ID)
-               }
-               n = &nodeInfo{
-                       entries:          make(map[Topic]*topicEntry),
-                       lastIssuedTicket: issued,
-                       lastUsedTicket:   used,
-               }
-               t.nodes[node] = n
-       }
-       return n
-}
-
-func (t *topicTable) checkDeleteNode(node *Node) {
-       if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < Now() {
-               //fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
-               delete(t.nodes, node)
-       }
-}
-
-func (t *topicTable) storeTicketCounters(node *Node) {
-       n := t.getOrNewNode(node)
-       if t.db != nil {
-               t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
-       }
-}
-
-func (t *topicTable) getEntries(topic Topic) []*Node {
-       t.collectGarbage()
-
-       te := t.topics[topic]
-       if te == nil {
-               return nil
-       }
-       nodes := make([]*Node, len(te.entries))
-       i := 0
-       for _, e := range te.entries {
-               nodes[i] = e.node
-               i++
-       }
-       t.requestCnt++
-       t.requested.update(te.rqItem, t.requestCnt)
-       return nodes
-}
-
-func (t *topicTable) addEntry(node *Node, topic Topic) {
-       n := t.getOrNewNode(node)
-       // clear previous entries by the same node
-       for _, e := range n.entries {
-               t.deleteEntry(e)
-       }
-       // ***
-       n = t.getOrNewNode(node)
-
-       tm := Now()
-       te := t.getOrNewTopic(topic)
-
-       if len(te.entries) == maxEntriesPerTopic {
-               t.deleteEntry(te.getFifoTail())
-       }
-
-       if t.globalEntries == maxEntries {
-               t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
-       }
-
-       fifoIdx := te.fifoHead
-       te.fifoHead++
-       entry := &topicEntry{
-               topic:   topic,
-               fifoIdx: fifoIdx,
-               node:    node,
-               expire:  tm + AbsTime(fallbackRegistrationExpiry),
-       }
-       if printTestImgLogs {
-               fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
-       }
-       te.entries[fifoIdx] = entry
-       n.entries[topic] = entry
-       t.globalEntries++
-       te.wcl.registered(tm)
-}
-
-// removes least requested element from the fifo
-func (t *topicTable) leastRequested() *topicEntry {
-       for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
-               heap.Pop(&t.requested)
-       }
-       if t.requested.Len() == 0 {
-               return nil
-       }
-       return t.topics[t.requested[0].topic].getFifoTail()
-}
-
-// entry should exist
-func (t *topicTable) deleteEntry(e *topicEntry) {
-       if printTestImgLogs {
-               fmt.Printf("*- %d %v %016x %016x\n", Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
-       }
-       ne := t.nodes[e.node].entries
-       delete(ne, e.topic)
-       if len(ne) == 0 {
-               t.checkDeleteNode(e.node)
-       }
-       te := t.topics[e.topic]
-       delete(te.entries, e.fifoIdx)
-       if len(te.entries) == 0 {
-               t.checkDeleteTopic(e.topic)
-       }
-       t.globalEntries--
-}
-
-// It is assumed that topics and waitPeriods have the same length.
-func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
-       log.Debug("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
-       //fmt.Println("useTicket", serialNo, topics, waitPeriods)
-       t.collectGarbage()
-
-       n := t.getOrNewNode(node)
-       if serialNo < n.lastUsedTicket {
-               return false
-       }
-
-       tm := Now()
-       if serialNo > n.lastUsedTicket && tm < n.noRegUntil {
-               return false
-       }
-       if serialNo != n.lastUsedTicket {
-               n.lastUsedTicket = serialNo
-               n.noRegUntil = tm + AbsTime(noRegTimeout())
-               t.storeTicketCounters(node)
-       }
-
-       currTime := uint64(tm / AbsTime(time.Second))
-       regTime := issueTime + uint64(waitPeriods[idx])
-       relTime := int64(currTime - regTime)
-       if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
-               if e := n.entries[topics[idx]]; e == nil {
-                       t.addEntry(node, topics[idx])
-               } else {
-                       // if there is an active entry, don't move to the front of the FIFO but prolong expire time
-                       e.expire = tm + AbsTime(fallbackRegistrationExpiry)
-               }
-               return true
-       }
-
-       return false
-}
-
-func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
-       topictab.collectGarbage()
-
-       now := Now()
-       n := topictab.getOrNewNode(node)
-       n.lastIssuedTicket++
-       topictab.storeTicketCounters(node)
-
-       t := &ticket{
-               issueTime: now,
-               topics:    topics,
-               serial:    n.lastIssuedTicket,
-               regTime:   make([]AbsTime, len(topics)),
-       }
-       for i, topic := range topics {
-               var waitPeriod time.Duration
-               if topic := topictab.topics[topic]; topic != nil {
-                       waitPeriod = topic.wcl.waitPeriod
-               } else {
-                       waitPeriod = minWaitPeriod
-               }
-
-               t.regTime[i] = now + AbsTime(waitPeriod)
-       }
-       return t
-}
-
-const gcInterval = time.Minute
-
-func (t *topicTable) collectGarbage() {
-       tm := Now()
-       if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
-               return
-       }
-       t.lastGarbageCollection = tm
-
-       for node, n := range t.nodes {
-               for _, e := range n.entries {
-                       if e.expire <= tm {
-                               t.deleteEntry(e)
-                       }
-               }
-
-               t.checkDeleteNode(node)
-       }
-
-       for topic := range t.topics {
-               t.checkDeleteTopic(topic)
-       }
-}
-
-const (
-       minWaitPeriod   = time.Minute
-       regTimeWindow   = 10 // seconds
-       avgnoRegTimeout = time.Minute * 10
-       // target average interval between two incoming ad requests
-       wcTargetRegInterval = time.Minute * 10 / maxEntriesPerTopic
-       //
-       wcTimeConst = time.Minute * 10
-)
-
-// initialization is not required, will set to minWaitPeriod at first registration
-type waitControlLoop struct {
-       lastIncoming AbsTime
-       waitPeriod   time.Duration
-}
-
-func (w *waitControlLoop) registered(tm AbsTime) {
-       w.waitPeriod = w.nextWaitPeriod(tm)
-       w.lastIncoming = tm
-}
-
-func (w *waitControlLoop) nextWaitPeriod(tm AbsTime) time.Duration {
-       period := tm - w.lastIncoming
-       wp := time.Duration(float64(w.waitPeriod) * math.Exp((float64(wcTargetRegInterval)-float64(period))/float64(wcTimeConst)))
-       if wp < minWaitPeriod {
-               wp = minWaitPeriod
-       }
-       return wp
-}
-
-func (w *waitControlLoop) hasMinimumWaitPeriod() bool {
-       return w.nextWaitPeriod(Now()) == minWaitPeriod
-}
-
-func noRegTimeout() time.Duration {
-       e := rand.ExpFloat64()
-       if e > 100 {
-               e = 100
-       }
-       return time.Duration(float64(avgnoRegTimeout) * e)
-}
-
-type topicRequestQueueItem struct {
-       topic    Topic
-       priority uint64
-       index    int
-}
-
-// A topicRequestQueue implements heap.Interface and holds topicRequestQueueItems.
-type topicRequestQueue []*topicRequestQueueItem
-
-func (tq topicRequestQueue) Len() int { return len(tq) }
-
-func (tq topicRequestQueue) Less(i, j int) bool {
-       return tq[i].priority < tq[j].priority
-}
-
-func (tq topicRequestQueue) Swap(i, j int) {
-       tq[i], tq[j] = tq[j], tq[i]
-       tq[i].index = i
-       tq[j].index = j
-}
-
-func (tq *topicRequestQueue) Push(x interface{}) {
-       n := len(*tq)
-       item := x.(*topicRequestQueueItem)
-       item.index = n
-       *tq = append(*tq, item)
-}
-
-func (tq *topicRequestQueue) Pop() interface{} {
-       old := *tq
-       n := len(old)
-       item := old[n-1]
-       item.index = -1
-       *tq = old[0 : n-1]
-       return item
-}
-
-func (tq *topicRequestQueue) update(item *topicRequestQueueItem, priority uint64) {
-       item.priority = priority
-       heap.Fix(tq, item.index)
-}