OSDN Git Service

rename (#465)
[bytom/vapor.git] / p2p / discover / dht / table.go
1 // Package discv5 implements the RLPx v5 Topic Discovery Protocol.
2 //
3 // The Topic Discovery protocol provides a way to find RLPx nodes that
4 // can be connected to. It uses a Kademlia-like protocol to maintain a
5 // distributed database of the IDs and endpoints of all listening
6 // nodes.
7 package dht
8
9 import (
10         "crypto/rand"
11         "encoding/binary"
12         "net"
13         "sort"
14
15         log "github.com/sirupsen/logrus"
16
17         "github.com/bytom/vapor/common"
18         "github.com/bytom/vapor/crypto"
19 )
20
21 const (
22         alpha      = 3  // Kademlia concurrency factor
23         bucketSize = 16 // Kademlia bucket size
24         hashBits   = len(common.Hash{}) * 8
25         nBuckets   = hashBits + 1 // Number of buckets
26
27         maxBondingPingPongs = 16
28         maxFindnodeFailures = 5
29 )
30
31 type Table struct {
32         count         int               // number of nodes
33         buckets       [nBuckets]*bucket // index of known nodes by distance
34         nodeAddedHook func(*Node)       // for testing
35         self          *Node             // metadata of the local node
36 }
37
38 // bucket contains nodes, ordered by their last activity. the entry
39 // that was most recently active is the first element in entries.
40 type bucket struct {
41         entries      []*Node
42         replacements []*Node
43 }
44
45 func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
46         self := NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port))
47         tab := &Table{self: self}
48         for i := range tab.buckets {
49                 tab.buckets[i] = new(bucket)
50         }
51         return tab
52 }
53
54 // chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
55 // buckets filled with live connections and keep the network topology healthy.
56 // This requires selecting addresses closer to our own with a higher probability
57 // in order to refresh closer buckets too.
58 //
59 // This algorithm approximates the distance distribution of existing nodes in the
60 // table by selecting a random node from the table and selecting a target address
61 // with a distance less than twice of that of the selected node.
62 // This algorithm will be improved later to specifically target the least recently
63 // used buckets.
64 func (tab *Table) chooseBucketRefreshTarget() common.Hash {
65         entries := 0
66         log.WithFields(log.Fields{"module": logModule, "self id:": tab.self.ID, "hex": crypto.Sha256Hash(tab.self.ID[:]).Hex()}).Debug()
67         for i, b := range &tab.buckets {
68                 entries += len(b.entries)
69                 for _, e := range b.entries {
70                         log.WithFields(log.Fields{"module": logModule, "bucket": i, "status": e.state, "addr": e.addr().String(), "id": e.ID.String(), "hex": e.sha.Hex()}).Debug()
71                 }
72         }
73
74         prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
75         dist := ^uint64(0)
76         entry := int(randUint(uint32(entries + 1)))
77         for _, b := range &tab.buckets {
78                 if entry < len(b.entries) {
79                         n := b.entries[entry]
80                         dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
81                         break
82                 }
83                 entry -= len(b.entries)
84         }
85
86         ddist := ^uint64(0)
87         if dist+dist > dist {
88                 ddist = dist
89         }
90         targetPrefix := prefix ^ randUint64n(ddist)
91
92         var target common.Hash
93         binary.BigEndian.PutUint64(target[0:8], targetPrefix)
94         rand.Read(target[8:])
95         return target
96 }
97
98 // readRandomNodes fills the given slice with random nodes from the
99 // table. It will not write the same node more than once. The nodes in
100 // the slice are copies and can be modified by the caller.
101 func (tab *Table) readRandomNodes(buf []*Node) (n int) {
102         // TODO: tree-based buckets would help here
103         // Find all non-empty buckets and get a fresh slice of their entries.
104         var buckets [][]*Node
105         for _, b := range &tab.buckets {
106                 if len(b.entries) > 0 {
107                         buckets = append(buckets, b.entries[:])
108                 }
109         }
110         if len(buckets) == 0 {
111                 return 0
112         }
113         // Shuffle the buckets.
114         for i := uint32(len(buckets)) - 1; i > 0; i-- {
115                 j := randUint(i)
116                 buckets[i], buckets[j] = buckets[j], buckets[i]
117         }
118         // Move head of each bucket into buf, removing buckets that become empty.
119         var i, j int
120         for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
121                 b := buckets[j]
122                 buf[i] = &(*b[0])
123                 buckets[j] = b[1:]
124                 if len(b) == 1 {
125                         buckets = append(buckets[:j], buckets[j+1:]...)
126                 }
127                 if len(buckets) == 0 {
128                         i++
129                         break
130                 }
131         }
132         return i
133 }
134
135 func randUint(max uint32) uint32 {
136         if max < 2 {
137                 return 0
138         }
139         var b [4]byte
140         rand.Read(b[:])
141         return binary.BigEndian.Uint32(b[:]) % max
142 }
143
144 func randUint64n(max uint64) uint64 {
145         if max < 2 {
146                 return 0
147         }
148         var b [8]byte
149         rand.Read(b[:])
150         return binary.BigEndian.Uint64(b[:]) % max
151 }
152
153 // closest returns the n nodes in the table that are closest to the
154 // given id. The caller must hold tab.mutex.
155 func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
156         // This is a very wasteful way to find the closest nodes but
157         // obviously correct. I believe that tree-based buckets would make
158         // this easier to implement efficiently.
159         closest := &nodesByDistance{target: target}
160         for _, b := range &tab.buckets {
161                 for _, n := range b.entries {
162                         closest.push(n, nresults)
163                 }
164         }
165         return closest
166 }
167
168 // add attempts to add the given node its corresponding bucket. If the
169 // bucket has space available, adding the node succeeds immediately.
170 // Otherwise, the node is added to the replacement cache for the bucket.
171 func (tab *Table) add(n *Node) (contested *Node) {
172         //fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
173         if n.ID == tab.self.ID {
174                 return
175         }
176         b := tab.buckets[logdist(tab.self.sha, n.sha)]
177         switch {
178         case b.bump(n):
179                 // n exists in b.
180                 return nil
181         case len(b.entries) < bucketSize:
182                 // b has space available.
183                 b.addFront(n)
184                 tab.count++
185                 if tab.nodeAddedHook != nil {
186                         tab.nodeAddedHook(n)
187                 }
188                 return nil
189         default:
190                 // b has no space left, add to replacement cache
191                 // and revalidate the last entry.
192                 tab.deleteFromReplacement(b, n)
193                 b.replacements = append(b.replacements, n)
194                 if len(b.replacements) > bucketSize {
195                         copy(b.replacements, b.replacements[1:])
196                         b.replacements = b.replacements[:len(b.replacements)-1]
197                 }
198                 return b.entries[len(b.entries)-1]
199         }
200 }
201
202 // stuff adds nodes the table to the end of their corresponding bucket
203 // if the bucket is not full.
204 func (tab *Table) stuff(nodes []*Node) {
205 outer:
206         for _, n := range nodes {
207                 if n.ID == tab.self.ID {
208                         continue // don't add self
209                 }
210                 bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
211                 for i := range bucket.entries {
212                         if bucket.entries[i].ID == n.ID {
213                                 continue outer // already in bucket
214                         }
215                 }
216                 if len(bucket.entries) < bucketSize {
217                         bucket.entries = append(bucket.entries, n)
218                         tab.count++
219                         if tab.nodeAddedHook != nil {
220                                 tab.nodeAddedHook(n)
221                         }
222                 }
223         }
224 }
225
226 func (tab *Table) deleteFromReplacement(bucket *bucket, node *Node) {
227         for i := 0; i < len(bucket.replacements); {
228                 if bucket.replacements[i].ID == node.ID {
229                         bucket.replacements = append(bucket.replacements[:i], bucket.replacements[i+1:]...)
230                 } else {
231                         i++
232                 }
233         }
234 }
235
236 // delete removes an entry from the node table (used to evacuate
237 // failed/non-bonded discovery peers).
238 func (tab *Table) delete(node *Node) {
239         //fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
240         bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
241         for i := range bucket.entries {
242                 if bucket.entries[i].ID == node.ID {
243                         bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...)
244                         tab.count--
245                         return
246                 }
247         }
248
249         tab.deleteFromReplacement(bucket, node)
250 }
251
252 func (tab *Table) deleteReplace(node *Node) {
253         b := tab.buckets[logdist(tab.self.sha, node.sha)]
254         i := 0
255         for i < len(b.entries) {
256                 if b.entries[i].ID == node.ID {
257                         b.entries = append(b.entries[:i], b.entries[i+1:]...)
258                         tab.count--
259                 } else {
260                         i++
261                 }
262         }
263
264         tab.deleteFromReplacement(b, node)
265         // refill from replacement cache
266         // TODO: maybe use random index
267         if len(b.entries) < bucketSize && len(b.replacements) > 0 {
268                 ri := len(b.replacements) - 1
269                 b.addFront(b.replacements[ri])
270                 tab.count++
271                 b.replacements[ri] = nil
272                 b.replacements = b.replacements[:ri]
273         }
274 }
275
276 func (b *bucket) addFront(n *Node) {
277         b.entries = append(b.entries, nil)
278         copy(b.entries[1:], b.entries)
279         b.entries[0] = n
280 }
281
282 func (b *bucket) bump(n *Node) bool {
283         for i := range b.entries {
284                 if b.entries[i].ID == n.ID {
285                         // move it to the front
286                         copy(b.entries[1:], b.entries[:i])
287                         b.entries[0] = n
288                         return true
289                 }
290         }
291         return false
292 }
293
294 // nodesByDistance is a list of nodes, ordered by
295 // distance to target.
296 type nodesByDistance struct {
297         entries []*Node
298         target  common.Hash
299 }
300
301 // push adds the given node to the list, keeping the total size below maxElems.
302 func (h *nodesByDistance) push(n *Node, maxElems int) {
303         ix := sort.Search(len(h.entries), func(i int) bool {
304                 return distcmp(h.target, h.entries[i].sha, n.sha) > 0
305         })
306         if len(h.entries) < maxElems {
307                 h.entries = append(h.entries, n)
308         }
309         if ix == len(h.entries) {
310                 // farther away than all nodes we already have.
311                 // if there was room for it, the node is now the last element.
312         } else {
313                 // slide existing entries down to make room
314                 // this will overwrite the entry we just appended.
315                 copy(h.entries[ix+1:], h.entries[ix:])
316                 h.entries[ix] = n
317         }
318 }