1 // Copyright 2016 The go-ethereum Authors
2 // This file is part of the go-ethereum library.
4 // The go-ethereum library is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
9 // The go-ethereum library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU Lesser General Public License for more details.
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
27 log "github.com/sirupsen/logrus"
28 "github.com/tendermint/go-crypto"
29 "github.com/tendermint/go-wire"
31 "github.com/bytom/common"
32 "github.com/bytom/p2p/netutil"
39 errPacketTooSmall = errors.New("too small")
40 errBadPrefix = errors.New("bad prefix")
41 errExpired = errors.New("expired")
42 errUnsolicitedReply = errors.New("unsolicited reply")
43 errUnknownNode = errors.New("unknown node")
44 errTimeout = errors.New("RPC timeout")
45 errClockWarp = errors.New("reply deadline too far in the future")
46 errClosed = errors.New("socket closed")
51 respTimeout = 500 * time.Millisecond
52 queryDelay = 1000 * time.Millisecond
53 expiration = 20 * time.Second
55 ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
56 ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning
57 driftThreshold = 10 * time.Second // Allowed clock drift before warning user
60 // ReadPacket is sent to the unhandled channel when it could not be processed
61 type ReadPacket struct {
66 // Config holds Table-related settings.
68 // These settings are required and configure the UDP listener:
69 PrivateKey *ecdsa.PrivateKey
71 // These settings are optional:
72 AnnounceAddr *net.UDPAddr // local address announced in the DHT
73 NodeDBPath string // if set, the node database is stored at this filesystem location
74 //NetRestrict *netutil.Netlist // network whitelist
75 Bootnodes []*Node // list of bootstrap nodes
76 Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
79 // RPC request structures
89 // Ignore additional fields (for forward compatibility).
93 // pong is the reply to ping.
95 // This field should mirror the UDP envelope address
96 // of the ping packet, which provides a way to discover the
97 // the external address (after NAT).
100 ReplyTok []byte // This contains the hash of the ping packet.
101 Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
104 TopicHash common.Hash
108 // Ignore additional fields (for forward compatibility).
112 // findnode is a query for nodes close to the given target.
114 Target NodeID // doesn't need to be an actual public key
116 // Ignore additional fields (for forward compatibility).
120 // findnode is a query for nodes close to the given target.
121 findnodeHash struct {
124 // Ignore additional fields (for forward compatibility).
132 // Ignore additional fields (for forward compatibility).
136 topicRegister struct {
147 // reply to topicQuery
154 IP net.IP // len 4 for IPv4 or 16 for IPv6
155 UDP uint16 // for discovery protocol
156 TCP uint16 // for RLPx protocol
161 IP net.IP // len 4 for IPv4 or 16 for IPv6
162 UDP uint16 // for discovery protocol
163 TCP uint16 // for RLPx protocol
168 versionPrefix = []byte("bytom discovery")
169 versionPrefixSize = len(versionPrefix)
172 headSize = versionPrefixSize + nodeIDSize + sigSize // space of packet frame data
175 // Neighbors replies are sent across multiple packets to
176 // stay below the 1280 byte limit. We compute the maximum number
177 // of entries by stuffing a packet until it grows too large.
178 var maxNeighbors = func() int {
179 p := neighbors{Expiration: ^uint64(0)}
180 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
182 p.Nodes = append(p.Nodes, maxSizeNode)
185 b := new(bytes.Buffer)
186 wire.WriteJSON(p, b, &size, &err)
188 // If this ever happens, it will be caught by the unit tests.
189 panic("cannot encode: " + err.Error())
191 if headSize+size+1 >= 1280 {
197 var maxTopicNodes = func() int {
199 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
201 p.Nodes = append(p.Nodes, maxSizeNode)
204 b := new(bytes.Buffer)
205 wire.WriteJSON(p, b, &size, &err)
207 // If this ever happens, it will be caught by the unit tests.
208 panic("cannot encode: " + err.Error())
210 if headSize+size+1 >= 1280 {
216 func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
221 return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
224 func (e1 rpcEndpoint) equal(e2 rpcEndpoint) bool {
225 return e1.UDP == e2.UDP && e1.TCP == e2.TCP && e1.IP.Equal(e2.IP)
228 func nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
229 if err := netutil.CheckRelayIP(sender.IP, rn.IP); err != nil {
232 n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
233 err := n.validateComplete()
237 func nodeToRPC(n *Node) rpcNode {
238 return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
241 type ingressPacket struct {
243 remoteAddr *net.UDPAddr
246 data interface{} // one of the RPC structs
250 type conn interface {
251 ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
252 WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
257 // udp implements the RPC protocol.
260 priv *crypto.PrivKeyEd25519
261 ourEndpoint rpcEndpoint
266 // ListenUDP returns a new table that listens for UDP packets on laddr.
267 func ListenUDP(priv *crypto.PrivKeyEd25519, conn conn, realaddr *net.UDPAddr, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) {
268 transport, err := listenUDP(priv, conn, realaddr)
272 net, err := newNetwork(transport, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), nodeDBPath, netrestrict)
276 log.Info("UDP listener up v5", "net", net.tab.self)
278 go transport.readLoop()
282 func listenUDP(priv *crypto.PrivKeyEd25519, conn conn, realaddr *net.UDPAddr) (*udp, error) {
283 return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(realaddr, uint16(realaddr.Port))}, nil
286 func (t *udp) localAddr() *net.UDPAddr {
287 return t.conn.LocalAddr().(*net.UDPAddr)
290 func (t *udp) Close() {
294 func (t *udp) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
295 hash, _ = t.sendPacket(remote.ID, remote.addr(), byte(ptype), data)
299 func (t *udp) sendPing(remote *Node, toaddr *net.UDPAddr, topics []Topic) (hash []byte) {
300 hash, _ = t.sendPacket(remote.ID, toaddr, byte(pingPacket), ping{
303 To: makeEndpoint(toaddr, uint16(toaddr.Port)), // TODO: maybe use known TCP port from DB
304 Expiration: uint64(time.Now().Add(expiration).Unix()),
310 func (t *udp) sendFindnode(remote *Node, target NodeID) {
311 t.sendPacket(remote.ID, remote.addr(), byte(findnodePacket), findnode{
313 Expiration: uint64(time.Now().Add(expiration).Unix()),
317 func (t *udp) sendNeighbours(remote *Node, results []*Node) {
318 // Send neighbors in chunks with at most maxNeighbors per packet
319 // to stay below the 1280 byte limit.
320 p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
321 for i, result := range results {
322 p.Nodes = append(p.Nodes, nodeToRPC(result))
323 if len(p.Nodes) == maxNeighbors || i == len(results)-1 {
324 t.sendPacket(remote.ID, remote.addr(), byte(neighborsPacket), p)
325 p.Nodes = p.Nodes[:0]
330 func (t *udp) sendFindnodeHash(remote *Node, target common.Hash) {
331 t.sendPacket(remote.ID, remote.addr(), byte(findnodeHashPacket), findnodeHash{
332 Target: common.Hash(target),
333 Expiration: uint64(time.Now().Add(expiration).Unix()),
337 func (t *udp) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
338 t.sendPacket(remote.ID, remote.addr(), byte(topicRegisterPacket), topicRegister{
345 func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
346 p := topicNodes{Echo: queryHash}
348 for _, result := range nodes {
349 if result.IP.Equal(t.net.tab.self.IP) || netutil.CheckRelayIP(remote.IP, result.IP) == nil {
350 p.Nodes = append(p.Nodes, nodeToRPC(result))
352 if len(p.Nodes) == maxTopicNodes {
353 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
354 p.Nodes = p.Nodes[:0]
358 if !sent || len(p.Nodes) > 0 {
359 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
363 func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
364 //fmt.Println("sendPacket", nodeEvent(ptype), toaddr.String(), toid.String())
365 packet, hash, err := encodePacket(t.priv, ptype, req)
370 log.Debug(fmt.Sprintf(">>> %v to %x@%v", nodeEvent(ptype), toid[:8], toaddr))
371 if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
372 log.Info(fmt.Sprint("UDP send failed:", err))
377 // zeroed padding space for encodePacket.
378 var headSpace = make([]byte, headSize)
380 func encodePacket(priv *crypto.PrivKeyEd25519, ptype byte, req interface{}) (p, hash []byte, err error) {
381 b := new(bytes.Buffer)
385 wire.WriteJSON(req, b, &size, &err)
387 log.Error(fmt.Sprint("error encoding packet:", err))
391 nodeID := priv.PubKey().Unwrap().(crypto.PubKeyEd25519)
392 sig := priv.Sign(common.BytesToHash(packet[headSize:]).Bytes())
393 copy(packet, versionPrefix)
394 copy(packet[versionPrefixSize:], nodeID[:])
395 copy(packet[versionPrefixSize+nodeIDSize:], sig.Bytes())
397 hash = common.BytesToHash(packet[versionPrefixSize:]).Bytes()
398 return packet, hash, nil
401 // readLoop runs in its own goroutine. it injects ingress UDP packets
402 // into the network loop.
403 func (t *udp) readLoop() {
405 // Discovery packets are defined to be no larger than 1280 bytes.
406 // Packets larger than this size will be cut at the end and treated
407 // as invalid because their hash won't match.
408 buf := make([]byte, 1280)
410 nbytes, from, err := t.conn.ReadFromUDP(buf)
411 if netutil.IsTemporaryError(err) {
412 // Ignore temporary read errors.
413 log.Debug(fmt.Sprintf("Temporary read error: %v", err))
415 } else if err != nil {
416 // Shut down the loop for permament errors.
417 log.Debug(fmt.Sprintf("Read error: %v", err))
420 t.handlePacket(from, buf[:nbytes])
424 func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
425 pkt := ingressPacket{remoteAddr: from}
426 if err := decodePacket(buf, &pkt); err != nil {
427 log.Debug(fmt.Sprintf("Bad packet from %v: %v", from, err))
428 //fmt.Println("bad packet", err)
431 t.net.reqReadPacket(pkt)
435 func decodePacket(buffer []byte, pkt *ingressPacket) error {
436 if len(buffer) < headSize+1 {
437 return errPacketTooSmall
439 buf := make([]byte, len(buffer))
441 prefix, fromID, sigdata := buf[:versionPrefixSize], buf[versionPrefixSize:versionPrefixSize+nodeIDSize], buf[headSize:]
442 if !bytes.Equal(prefix, versionPrefix) {
446 pkt.hash = common.BytesToHash(buf[versionPrefixSize:]).Bytes()
447 pkt.remoteID = ByteID(fromID)
448 switch pkt.ev = nodeEvent(sigdata[0]); pkt.ev {
454 pkt.data = new(findnode)
455 case neighborsPacket:
456 pkt.data = new(neighbors)
457 case findnodeHashPacket:
458 pkt.data = new(findnodeHash)
459 case topicRegisterPacket:
460 pkt.data = new(topicRegister)
461 case topicQueryPacket:
462 pkt.data = new(topicQuery)
463 case topicNodesPacket:
464 pkt.data = new(topicNodes)
466 return fmt.Errorf("unknown packet type: %d", sigdata[0])
469 wire.ReadJSON(pkt.data, sigdata[1:], &err)
471 log.Error("wire readjson err:", err)