OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / p2p / discover / table.go
1 // Package discv5 implements the RLPx v5 Topic Discovery Protocol.
2 //
3 // The Topic Discovery protocol provides a way to find RLPx nodes that
4 // can be connected to. It uses a Kademlia-like protocol to maintain a
5 // distributed database of the IDs and endpoints of all listening
6 // nodes.
7 package discover
8
9 import (
10         "crypto/rand"
11         "encoding/binary"
12         "fmt"
13         "net"
14         "sort"
15
16         "github.com/vapor/common"
17         "github.com/vapor/crypto"
18 )
19
20 const (
21         alpha      = 3  // Kademlia concurrency factor
22         bucketSize = 16 // Kademlia bucket size
23         hashBits   = len(common.Hash{}) * 8
24         nBuckets   = hashBits + 1 // Number of buckets
25
26         maxBondingPingPongs = 16
27         maxFindnodeFailures = 5
28 )
29
30 type Table struct {
31         count         int               // number of nodes
32         buckets       [nBuckets]*bucket // index of known nodes by distance
33         nodeAddedHook func(*Node)       // for testing
34         self          *Node             // metadata of the local node
35 }
36
37 // bucket contains nodes, ordered by their last activity. the entry
38 // that was most recently active is the first element in entries.
39 type bucket struct {
40         entries      []*Node
41         replacements []*Node
42 }
43
44 func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
45         self := NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port))
46         tab := &Table{self: self}
47         for i := range tab.buckets {
48                 tab.buckets[i] = new(bucket)
49         }
50         return tab
51 }
52
53 const printTable = false
54
55 // chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
56 // buckets filled with live connections and keep the network topology healthy.
57 // This requires selecting addresses closer to our own with a higher probability
58 // in order to refresh closer buckets too.
59 //
60 // This algorithm approximates the distance distribution of existing nodes in the
61 // table by selecting a random node from the table and selecting a target address
62 // with a distance less than twice of that of the selected node.
63 // This algorithm will be improved later to specifically target the least recently
64 // used buckets.
65 func (tab *Table) chooseBucketRefreshTarget() common.Hash {
66         entries := 0
67         if printTable {
68                 fmt.Println()
69                 fmt.Println("self ", "id:", tab.self.ID, " hex:", crypto.Sha256Hash(tab.self.ID[:]).Hex())
70         }
71         for i, b := range &tab.buckets {
72                 entries += len(b.entries)
73                 if printTable {
74                         for _, e := range b.entries {
75                                 fmt.Println(i, e.state, e.addr().String(), e.ID.String(), e.sha.Hex())
76                         }
77                 }
78         }
79
80         prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
81         dist := ^uint64(0)
82         entry := int(randUint(uint32(entries + 1)))
83         for _, b := range &tab.buckets {
84                 if entry < len(b.entries) {
85                         n := b.entries[entry]
86                         dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
87                         break
88                 }
89                 entry -= len(b.entries)
90         }
91
92         ddist := ^uint64(0)
93         if dist+dist > dist {
94                 ddist = dist
95         }
96         targetPrefix := prefix ^ randUint64n(ddist)
97
98         var target common.Hash
99         binary.BigEndian.PutUint64(target[0:8], targetPrefix)
100         rand.Read(target[8:])
101         return target
102 }
103
104 // readRandomNodes fills the given slice with random nodes from the
105 // table. It will not write the same node more than once. The nodes in
106 // the slice are copies and can be modified by the caller.
107 func (tab *Table) readRandomNodes(buf []*Node) (n int) {
108         // TODO: tree-based buckets would help here
109         // Find all non-empty buckets and get a fresh slice of their entries.
110         var buckets [][]*Node
111         for _, b := range &tab.buckets {
112                 if len(b.entries) > 0 {
113                         buckets = append(buckets, b.entries[:])
114                 }
115         }
116         if len(buckets) == 0 {
117                 return 0
118         }
119         // Shuffle the buckets.
120         for i := uint32(len(buckets)) - 1; i > 0; i-- {
121                 j := randUint(i)
122                 buckets[i], buckets[j] = buckets[j], buckets[i]
123         }
124         // Move head of each bucket into buf, removing buckets that become empty.
125         var i, j int
126         for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
127                 b := buckets[j]
128                 buf[i] = &(*b[0])
129                 buckets[j] = b[1:]
130                 if len(b) == 1 {
131                         buckets = append(buckets[:j], buckets[j+1:]...)
132                 }
133                 if len(buckets) == 0 {
134                         i++
135                         break
136                 }
137         }
138         return i
139 }
140
141 func randUint(max uint32) uint32 {
142         if max < 2 {
143                 return 0
144         }
145         var b [4]byte
146         rand.Read(b[:])
147         return binary.BigEndian.Uint32(b[:]) % max
148 }
149
150 func randUint64n(max uint64) uint64 {
151         if max < 2 {
152                 return 0
153         }
154         var b [8]byte
155         rand.Read(b[:])
156         return binary.BigEndian.Uint64(b[:]) % max
157 }
158
159 // closest returns the n nodes in the table that are closest to the
160 // given id. The caller must hold tab.mutex.
161 func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
162         // This is a very wasteful way to find the closest nodes but
163         // obviously correct. I believe that tree-based buckets would make
164         // this easier to implement efficiently.
165         closest := &nodesByDistance{target: target}
166         for _, b := range &tab.buckets {
167                 for _, n := range b.entries {
168                         closest.push(n, nresults)
169                 }
170         }
171         return closest
172 }
173
174 // add attempts to add the given node its corresponding bucket. If the
175 // bucket has space available, adding the node succeeds immediately.
176 // Otherwise, the node is added to the replacement cache for the bucket.
177 func (tab *Table) add(n *Node) (contested *Node) {
178         //fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
179         if n.ID == tab.self.ID {
180                 return
181         }
182         b := tab.buckets[logdist(tab.self.sha, n.sha)]
183         switch {
184         case b.bump(n):
185                 // n exists in b.
186                 return nil
187         case len(b.entries) < bucketSize:
188                 // b has space available.
189                 b.addFront(n)
190                 tab.count++
191                 if tab.nodeAddedHook != nil {
192                         tab.nodeAddedHook(n)
193                 }
194                 return nil
195         default:
196                 // b has no space left, add to replacement cache
197                 // and revalidate the last entry.
198                 tab.deleteFromReplacement(b, n)
199                 b.replacements = append(b.replacements, n)
200                 if len(b.replacements) > bucketSize {
201                         copy(b.replacements, b.replacements[1:])
202                         b.replacements = b.replacements[:len(b.replacements)-1]
203                 }
204                 return b.entries[len(b.entries)-1]
205         }
206 }
207
208 // stuff adds nodes the table to the end of their corresponding bucket
209 // if the bucket is not full.
210 func (tab *Table) stuff(nodes []*Node) {
211 outer:
212         for _, n := range nodes {
213                 if n.ID == tab.self.ID {
214                         continue // don't add self
215                 }
216                 bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
217                 for i := range bucket.entries {
218                         if bucket.entries[i].ID == n.ID {
219                                 continue outer // already in bucket
220                         }
221                 }
222                 if len(bucket.entries) < bucketSize {
223                         bucket.entries = append(bucket.entries, n)
224                         tab.count++
225                         if tab.nodeAddedHook != nil {
226                                 tab.nodeAddedHook(n)
227                         }
228                 }
229         }
230 }
231
232 func (tab *Table) deleteFromReplacement(bucket *bucket, node *Node) {
233         for i := 0; i < len(bucket.replacements); {
234                 if bucket.replacements[i].ID == node.ID {
235                         bucket.replacements = append(bucket.replacements[:i], bucket.replacements[i+1:]...)
236                 } else {
237                         i++
238                 }
239         }
240 }
241
242 // delete removes an entry from the node table (used to evacuate
243 // failed/non-bonded discovery peers).
244 func (tab *Table) delete(node *Node) {
245         //fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
246         bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
247         for i := range bucket.entries {
248                 if bucket.entries[i].ID == node.ID {
249                         bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...)
250                         tab.count--
251                         return
252                 }
253         }
254
255         tab.deleteFromReplacement(bucket, node)
256 }
257
258 func (tab *Table) deleteReplace(node *Node) {
259         b := tab.buckets[logdist(tab.self.sha, node.sha)]
260         i := 0
261         for i < len(b.entries) {
262                 if b.entries[i].ID == node.ID {
263                         b.entries = append(b.entries[:i], b.entries[i+1:]...)
264                         tab.count--
265                 } else {
266                         i++
267                 }
268         }
269
270         tab.deleteFromReplacement(b, node)
271         // refill from replacement cache
272         // TODO: maybe use random index
273         if len(b.entries) < bucketSize && len(b.replacements) > 0 {
274                 ri := len(b.replacements) - 1
275                 b.addFront(b.replacements[ri])
276                 tab.count++
277                 b.replacements[ri] = nil
278                 b.replacements = b.replacements[:ri]
279         }
280 }
281
282 func (b *bucket) addFront(n *Node) {
283         b.entries = append(b.entries, nil)
284         copy(b.entries[1:], b.entries)
285         b.entries[0] = n
286 }
287
288 func (b *bucket) bump(n *Node) bool {
289         for i := range b.entries {
290                 if b.entries[i].ID == n.ID {
291                         // move it to the front
292                         copy(b.entries[1:], b.entries[:i])
293                         b.entries[0] = n
294                         return true
295                 }
296         }
297         return false
298 }
299
300 // nodesByDistance is a list of nodes, ordered by
301 // distance to target.
302 type nodesByDistance struct {
303         entries []*Node
304         target  common.Hash
305 }
306
307 // push adds the given node to the list, keeping the total size below maxElems.
308 func (h *nodesByDistance) push(n *Node, maxElems int) {
309         ix := sort.Search(len(h.entries), func(i int) bool {
310                 return distcmp(h.target, h.entries[i].sha, n.sha) > 0
311         })
312         if len(h.entries) < maxElems {
313                 h.entries = append(h.entries, n)
314         }
315         if ix == len(h.entries) {
316                 // farther away than all nodes we already have.
317                 // if there was room for it, the node is now the last element.
318         } else {
319                 // slide existing entries down to make room
320                 // this will overwrite the entry we just appended.
321                 copy(h.entries[ix+1:], h.entries[ix:])
322                 h.entries[ix] = n
323         }
324 }