1 // Package discv5 implements the RLPx v5 Topic Discovery Protocol.
3 // The Topic Discovery protocol provides a way to find RLPx nodes that
4 // can be connected to. It uses a Kademlia-like protocol to maintain a
5 // distributed database of the IDs and endpoints of all listening
15 log "github.com/sirupsen/logrus"
17 "github.com/vapor/common"
18 "github.com/vapor/crypto"
22 alpha = 3 // Kademlia concurrency factor
23 bucketSize = 16 // Kademlia bucket size
24 hashBits = len(common.Hash{}) * 8
25 nBuckets = hashBits + 1 // Number of buckets
27 maxBondingPingPongs = 16
28 maxFindnodeFailures = 5
32 count int // number of nodes
33 buckets [nBuckets]*bucket // index of known nodes by distance
34 nodeAddedHook func(*Node) // for testing
35 self *Node // metadata of the local node
38 // bucket contains nodes, ordered by their last activity. the entry
39 // that was most recently active is the first element in entries.
45 func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
46 self := NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port))
47 tab := &Table{self: self}
48 for i := range tab.buckets {
49 tab.buckets[i] = new(bucket)
54 // chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
55 // buckets filled with live connections and keep the network topology healthy.
56 // This requires selecting addresses closer to our own with a higher probability
57 // in order to refresh closer buckets too.
59 // This algorithm approximates the distance distribution of existing nodes in the
60 // table by selecting a random node from the table and selecting a target address
61 // with a distance less than twice of that of the selected node.
62 // This algorithm will be improved later to specifically target the least recently
64 func (tab *Table) chooseBucketRefreshTarget() common.Hash {
66 log.WithFields(log.Fields{"module": logModule, "self id:": tab.self.ID, "hex": crypto.Sha256Hash(tab.self.ID[:]).Hex()}).Debug()
67 for i, b := range &tab.buckets {
68 entries += len(b.entries)
69 for _, e := range b.entries {
70 log.WithFields(log.Fields{"module": logModule, "bucket": i, "status": e.state, "addr": e.addr().String(), "id": e.ID.String(), "hex": e.sha.Hex()}).Debug()
74 prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
76 entry := int(randUint(uint32(entries + 1)))
77 for _, b := range &tab.buckets {
78 if entry < len(b.entries) {
80 dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
83 entry -= len(b.entries)
90 targetPrefix := prefix ^ randUint64n(ddist)
92 var target common.Hash
93 binary.BigEndian.PutUint64(target[0:8], targetPrefix)
98 // readRandomNodes fills the given slice with random nodes from the
99 // table. It will not write the same node more than once. The nodes in
100 // the slice are copies and can be modified by the caller.
101 func (tab *Table) readRandomNodes(buf []*Node) (n int) {
102 // TODO: tree-based buckets would help here
103 // Find all non-empty buckets and get a fresh slice of their entries.
104 var buckets [][]*Node
105 for _, b := range &tab.buckets {
106 if len(b.entries) > 0 {
107 buckets = append(buckets, b.entries[:])
110 if len(buckets) == 0 {
113 // Shuffle the buckets.
114 for i := uint32(len(buckets)) - 1; i > 0; i-- {
116 buckets[i], buckets[j] = buckets[j], buckets[i]
118 // Move head of each bucket into buf, removing buckets that become empty.
120 for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
125 buckets = append(buckets[:j], buckets[j+1:]...)
127 if len(buckets) == 0 {
135 func randUint(max uint32) uint32 {
141 return binary.BigEndian.Uint32(b[:]) % max
144 func randUint64n(max uint64) uint64 {
150 return binary.BigEndian.Uint64(b[:]) % max
153 // closest returns the n nodes in the table that are closest to the
154 // given id. The caller must hold tab.mutex.
155 func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
156 // This is a very wasteful way to find the closest nodes but
157 // obviously correct. I believe that tree-based buckets would make
158 // this easier to implement efficiently.
159 closest := &nodesByDistance{target: target}
160 for _, b := range &tab.buckets {
161 for _, n := range b.entries {
162 closest.push(n, nresults)
168 // add attempts to add the given node its corresponding bucket. If the
169 // bucket has space available, adding the node succeeds immediately.
170 // Otherwise, the node is added to the replacement cache for the bucket.
171 func (tab *Table) add(n *Node) (contested *Node) {
172 //fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
173 if n.ID == tab.self.ID {
176 b := tab.buckets[logdist(tab.self.sha, n.sha)]
181 case len(b.entries) < bucketSize:
182 // b has space available.
185 if tab.nodeAddedHook != nil {
190 // b has no space left, add to replacement cache
191 // and revalidate the last entry.
192 tab.deleteFromReplacement(b, n)
193 b.replacements = append(b.replacements, n)
194 if len(b.replacements) > bucketSize {
195 copy(b.replacements, b.replacements[1:])
196 b.replacements = b.replacements[:len(b.replacements)-1]
198 return b.entries[len(b.entries)-1]
202 // stuff adds nodes the table to the end of their corresponding bucket
203 // if the bucket is not full.
204 func (tab *Table) stuff(nodes []*Node) {
206 for _, n := range nodes {
207 if n.ID == tab.self.ID {
208 continue // don't add self
210 bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
211 for i := range bucket.entries {
212 if bucket.entries[i].ID == n.ID {
213 continue outer // already in bucket
216 if len(bucket.entries) < bucketSize {
217 bucket.entries = append(bucket.entries, n)
219 if tab.nodeAddedHook != nil {
226 func (tab *Table) deleteFromReplacement(bucket *bucket, node *Node) {
227 for i := 0; i < len(bucket.replacements); {
228 if bucket.replacements[i].ID == node.ID {
229 bucket.replacements = append(bucket.replacements[:i], bucket.replacements[i+1:]...)
236 // delete removes an entry from the node table (used to evacuate
237 // failed/non-bonded discovery peers).
238 func (tab *Table) delete(node *Node) {
239 //fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
240 bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
241 for i := range bucket.entries {
242 if bucket.entries[i].ID == node.ID {
243 bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...)
249 tab.deleteFromReplacement(bucket, node)
252 func (tab *Table) deleteReplace(node *Node) {
253 b := tab.buckets[logdist(tab.self.sha, node.sha)]
255 for i < len(b.entries) {
256 if b.entries[i].ID == node.ID {
257 b.entries = append(b.entries[:i], b.entries[i+1:]...)
264 tab.deleteFromReplacement(b, node)
265 // refill from replacement cache
266 // TODO: maybe use random index
267 if len(b.entries) < bucketSize && len(b.replacements) > 0 {
268 ri := len(b.replacements) - 1
269 b.addFront(b.replacements[ri])
271 b.replacements[ri] = nil
272 b.replacements = b.replacements[:ri]
276 func (b *bucket) addFront(n *Node) {
277 b.entries = append(b.entries, nil)
278 copy(b.entries[1:], b.entries)
282 func (b *bucket) bump(n *Node) bool {
283 for i := range b.entries {
284 if b.entries[i].ID == n.ID {
285 // move it to the front
286 copy(b.entries[1:], b.entries[:i])
294 // nodesByDistance is a list of nodes, ordered by
295 // distance to target.
296 type nodesByDistance struct {
301 // push adds the given node to the list, keeping the total size below maxElems.
302 func (h *nodesByDistance) push(n *Node, maxElems int) {
303 ix := sort.Search(len(h.entries), func(i int) bool {
304 return distcmp(h.target, h.entries[i].sha, n.sha) > 0
306 if len(h.entries) < maxElems {
307 h.entries = append(h.entries, n)
309 if ix == len(h.entries) {
310 // farther away than all nodes we already have.
311 // if there was room for it, the node is now the last element.
313 // slide existing entries down to make room
314 // this will overwrite the entry we just appended.
315 copy(h.entries[ix+1:], h.entries[ix:])