14 log "github.com/sirupsen/logrus"
15 "github.com/tendermint/go-wire"
17 "github.com/vapor/common"
18 cfg "github.com/vapor/config"
19 "github.com/vapor/crypto"
20 "github.com/vapor/crypto/ed25519"
21 "github.com/vapor/p2p/netutil"
22 "github.com/vapor/version"
26 //Version dht discover protocol version
28 logModule = "discover"
33 errPacketTooSmall = errors.New("too small")
34 errPrefixMismatch = errors.New("prefix mismatch")
35 errNetIDMismatch = errors.New("network id mismatch")
36 errPacketType = errors.New("unknown packet type")
41 respTimeout = 1 * time.Second
42 expiration = 20 * time.Second
45 // ReadPacket is sent to the unhandled channel when it could not be processed
46 type ReadPacket struct {
51 // Config holds Table-related settings.
53 // These settings are required and configure the UDP listener:
54 PrivateKey *ecdsa.PrivateKey
56 // These settings are optional:
57 AnnounceAddr *net.UDPAddr // local address announced in the DHT
58 NodeDBPath string // if set, the node database is stored at this filesystem location
59 //NetRestrict *netutil.Netlist // network whitelist
60 Bootnodes []*Node // list of bootstrap nodes
61 Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
64 // RPC request structures
74 // Ignore additional fields (for forward compatibility).
78 // pong is the reply to ping.
80 // This field should mirror the UDP envelope address
81 // of the ping packet, which provides a way to discover the
82 // the external address (after NAT).
85 ReplyTok []byte // This contains the hash of the ping packet.
86 Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
93 // Ignore additional fields (for forward compatibility).
97 // findnode is a query for nodes close to the given target.
99 Target NodeID // doesn't need to be an actual public key
101 // Ignore additional fields (for forward compatibility).
105 // findnode is a query for nodes close to the given target.
106 findnodeHash struct {
109 // Ignore additional fields (for forward compatibility).
117 // Ignore additional fields (for forward compatibility).
121 topicRegister struct {
132 // reply to topicQuery
139 IP net.IP // len 4 for IPv4 or 16 for IPv6
140 UDP uint16 // for discovery protocol
141 TCP uint16 // for RLPx protocol
146 IP net.IP // len 4 for IPv4 or 16 for IPv6
147 UDP uint16 // for discovery protocol
148 TCP uint16 // for RLPx protocol
156 headSize = netIDSize + nodeIDSize + sigSize // space of packet frame data
159 // Neighbors replies are sent across multiple packets to
160 // stay below the 1280 byte limit. We compute the maximum number
161 // of entries by stuffing a packet until it grows too large.
162 var maxNeighbors = func() int {
163 p := neighbors{Expiration: ^uint64(0)}
164 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
166 p.Nodes = append(p.Nodes, maxSizeNode)
169 b := new(bytes.Buffer)
170 wire.WriteJSON(p, b, &size, &err)
172 // If this ever happens, it will be caught by the unit tests.
173 panic("cannot encode: " + err.Error())
175 if headSize+size+1 >= 1280 {
181 var maxTopicNodes = func() int {
183 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
185 p.Nodes = append(p.Nodes, maxSizeNode)
188 b := new(bytes.Buffer)
189 wire.WriteJSON(p, b, &size, &err)
191 // If this ever happens, it will be caught by the unit tests.
192 panic("cannot encode: " + err.Error())
194 if headSize+size+1 >= 1280 {
200 func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
205 return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
208 func (e1 rpcEndpoint) equal(e2 rpcEndpoint) bool {
209 return e1.UDP == e2.UDP && e1.TCP == e2.TCP && e1.IP.Equal(e2.IP)
212 func nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
213 if err := netutil.CheckRelayIP(sender.IP, rn.IP); err != nil {
216 n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
217 err := n.validateComplete()
221 func nodeToRPC(n *Node) rpcNode {
222 return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
225 type ingressPacket struct {
227 remoteAddr *net.UDPAddr
230 data interface{} // one of the RPC structs
234 type conn interface {
235 ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
236 WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
241 type netWork interface {
242 reqReadPacket(pkt ingressPacket)
246 // udp implements the RPC protocol.
249 priv ed25519.PrivateKey
250 //netID used to isolate subnets
252 ourEndpoint rpcEndpoint
256 //NewDiscover create new dht discover
257 func NewDiscover(config *cfg.Config, priv ed25519.PrivateKey, port uint16, netID uint64) (*Network, error) {
258 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
263 conn, err := net.ListenUDP("udp", addr)
268 realaddr := conn.LocalAddr().(*net.UDPAddr)
269 ntab, err := ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover"), nil, netID)
273 seeds, err := QueryDNSSeeds(net.LookupHost)
275 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on query dns seeds")
278 codedSeeds := netutil.CheckAndSplitAddresses(config.P2P.Seeds)
279 seeds = append(seeds, codedSeeds...)
285 for _, seed := range seeds {
286 version.Status.AddSeed(seed)
287 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
288 nodes = append(nodes, MustParseNode(url))
291 if err = ntab.SetFallbackNodes(nodes); err != nil {
297 // ListenUDP returns a new table that listens for UDP packets on laddr.
298 func ListenUDP(priv ed25519.PrivateKey, conn conn, realaddr *net.UDPAddr, nodeDBPath string, netrestrict *netutil.Netlist, netID uint64) (*Network, error) {
299 transport, err := listenUDP(priv, conn, realaddr, netID)
304 net, err := newNetwork(transport, priv.Public(), nodeDBPath, netrestrict)
308 log.WithFields(log.Fields{"module": logModule, "net": net.tab.self}).Info("UDP listener up v5")
310 go transport.readLoop()
314 func listenUDP(priv ed25519.PrivateKey, conn conn, realaddr *net.UDPAddr, netID uint64) (*udp, error) {
315 return &udp{conn: conn, priv: priv, netID: netID, ourEndpoint: makeEndpoint(realaddr, uint16(realaddr.Port))}, nil
318 func (t *udp) localAddr() *net.UDPAddr {
319 return t.conn.LocalAddr().(*net.UDPAddr)
322 func (t *udp) Close() {
326 func (t *udp) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
327 hash, _ = t.sendPacket(remote.ID, remote.addr(), byte(ptype), data)
331 func (t *udp) sendPing(remote *Node, toaddr *net.UDPAddr, topics []Topic) (hash []byte) {
332 hash, _ = t.sendPacket(remote.ID, toaddr, byte(pingPacket), ping{
335 To: makeEndpoint(toaddr, uint16(toaddr.Port)), // TODO: maybe use known TCP port from DB
336 Expiration: uint64(time.Now().Add(expiration).Unix()),
342 func (t *udp) sendFindnode(remote *Node, target NodeID) {
343 t.sendPacket(remote.ID, remote.addr(), byte(findnodePacket), findnode{
345 Expiration: uint64(time.Now().Add(expiration).Unix()),
349 func (t *udp) sendNeighbours(remote *Node, results []*Node) {
350 // Send neighbors in chunks with at most maxNeighbors per packet
351 // to stay below the 1280 byte limit.
352 p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
353 for i, result := range results {
354 p.Nodes = append(p.Nodes, nodeToRPC(result))
355 if len(p.Nodes) == maxNeighbors || i == len(results)-1 {
356 t.sendPacket(remote.ID, remote.addr(), byte(neighborsPacket), p)
357 p.Nodes = p.Nodes[:0]
362 func (t *udp) sendFindnodeHash(remote *Node, target common.Hash) {
363 t.sendPacket(remote.ID, remote.addr(), byte(findnodeHashPacket), findnodeHash{
365 Expiration: uint64(time.Now().Add(expiration).Unix()),
369 func (t *udp) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
370 t.sendPacket(remote.ID, remote.addr(), byte(topicRegisterPacket), topicRegister{
377 func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
378 p := topicNodes{Echo: queryHash}
380 for _, result := range nodes {
381 if result.IP.Equal(t.net.selfIP()) || netutil.CheckRelayIP(remote.IP, result.IP) == nil {
382 p.Nodes = append(p.Nodes, nodeToRPC(result))
384 if len(p.Nodes) == maxTopicNodes {
385 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
386 p.Nodes = p.Nodes[:0]
390 if !sent || len(p.Nodes) > 0 {
391 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
395 func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
396 packet, hash, err := encodePacket(t.priv, ptype, req, t.netID)
400 log.WithFields(log.Fields{"module": logModule, "event": nodeEvent(ptype), "to id": hex.EncodeToString(toid[:8]), "to addr": toaddr}).Debug("send packet")
401 if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
402 log.WithFields(log.Fields{"module": logModule, "error": err}).Info(fmt.Sprint("UDP send failed"))
407 // zeroed padding space for encodePacket.
408 var headSpace = make([]byte, headSize)
410 func encodePacket(priv ed25519.PrivateKey, ptype byte, req interface{}, netID uint64) (p, hash []byte, err error) {
411 b := new(bytes.Buffer)
415 wire.WriteJSON(req, b, &size, &err)
417 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("error encoding packet")
421 nodeID := priv.Public()
422 sig := ed25519.Sign(priv, common.BytesToHash(packet[headSize:]).Bytes())
423 id := []byte(strconv.FormatUint(netID, 16))
424 copy(packet[:], id[:])
425 copy(packet[netIDSize:], nodeID[:])
426 copy(packet[netIDSize+nodeIDSize:], sig)
428 hash = common.BytesToHash(packet[:]).Bytes()
429 return packet, hash, nil
432 // readLoop runs in its own goroutine. it injects ingress UDP packets
433 // into the network loop.
434 func (t *udp) readLoop() {
436 // Discovery packets are defined to be no larger than 1280 bytes.
437 // Packets larger than this size will be cut at the end and treated
438 // as invalid because their hash won't match.
439 buf := make([]byte, 1280)
441 nbytes, from, err := t.conn.ReadFromUDP(buf)
442 if netutil.IsTemporaryError(err) {
443 // Ignore temporary read errors.
444 log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Temporary read error")
446 } else if err != nil {
447 // Shut down the loop for permament errors.
448 log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Read error")
451 if err := t.handlePacket(from, buf[:nbytes]); err != nil {
452 log.WithFields(log.Fields{"module": logModule, "from": from, "error": err}).Error("handle packet err")
457 func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
458 pkt := ingressPacket{remoteAddr: from}
459 if err := decodePacket(buf, &pkt, t.netID); err != nil {
462 t.net.reqReadPacket(pkt)
466 func (t *udp) getNetID() uint64 {
470 func decodePacket(buffer []byte, pkt *ingressPacket, netID uint64) error {
471 if len(buffer) < headSize+1 {
472 return errPacketTooSmall
474 buf := make([]byte, len(buffer))
476 fromID, sigdata := buf[netIDSize:netIDSize+nodeIDSize], buf[headSize:]
478 if !bytes.Equal(buf[:netIDSize], []byte(strconv.FormatUint(netID, 16))[:netIDSize]) {
479 return errNetIDMismatch
483 pkt.hash = common.BytesToHash(buf[:]).Bytes()
484 pkt.remoteID = ByteID(fromID)
485 switch pkt.ev = nodeEvent(sigdata[0]); pkt.ev {
491 pkt.data = new(findnode)
492 case neighborsPacket:
493 pkt.data = new(neighbors)
494 case findnodeHashPacket:
495 pkt.data = new(findnodeHash)
496 case topicRegisterPacket:
497 pkt.data = new(topicRegister)
498 case topicQueryPacket:
499 pkt.data = new(topicQuery)
500 case topicNodesPacket:
501 pkt.data = new(topicNodes)
506 wire.ReadJSON(pkt.data, sigdata[1:], &err)
508 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("wire readjson err")