11 log "github.com/sirupsen/logrus"
12 "github.com/tendermint/go-crypto"
13 "github.com/tendermint/go-wire"
15 "github.com/vapor/common"
16 "github.com/vapor/p2p/netutil"
23 errPacketTooSmall = errors.New("too small")
24 errBadPrefix = errors.New("bad prefix")
25 errExpired = errors.New("expired")
26 errUnsolicitedReply = errors.New("unsolicited reply")
27 errUnknownNode = errors.New("unknown node")
28 errTimeout = errors.New("RPC timeout")
29 errClockWarp = errors.New("reply deadline too far in the future")
30 errClosed = errors.New("socket closed")
35 respTimeout = 1 * time.Second
36 queryDelay = 1000 * time.Millisecond
37 expiration = 20 * time.Second
39 ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
40 ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning
41 driftThreshold = 10 * time.Second // Allowed clock drift before warning user
44 // ReadPacket is sent to the unhandled channel when it could not be processed
45 type ReadPacket struct {
50 // Config holds Table-related settings.
52 // These settings are required and configure the UDP listener:
53 PrivateKey *ecdsa.PrivateKey
55 // These settings are optional:
56 AnnounceAddr *net.UDPAddr // local address announced in the DHT
57 NodeDBPath string // if set, the node database is stored at this filesystem location
58 //NetRestrict *netutil.Netlist // network whitelist
59 Bootnodes []*Node // list of bootstrap nodes
60 Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
63 // RPC request structures
73 // Ignore additional fields (for forward compatibility).
77 // pong is the reply to ping.
79 // This field should mirror the UDP envelope address
80 // of the ping packet, which provides a way to discover the
81 // the external address (after NAT).
84 ReplyTok []byte // This contains the hash of the ping packet.
85 Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
92 // Ignore additional fields (for forward compatibility).
96 // findnode is a query for nodes close to the given target.
98 Target NodeID // doesn't need to be an actual public key
100 // Ignore additional fields (for forward compatibility).
104 // findnode is a query for nodes close to the given target.
105 findnodeHash struct {
108 // Ignore additional fields (for forward compatibility).
116 // Ignore additional fields (for forward compatibility).
120 topicRegister struct {
131 // reply to topicQuery
138 IP net.IP // len 4 for IPv4 or 16 for IPv6
139 UDP uint16 // for discovery protocol
140 TCP uint16 // for RLPx protocol
145 IP net.IP // len 4 for IPv4 or 16 for IPv6
146 UDP uint16 // for discovery protocol
147 TCP uint16 // for RLPx protocol
152 versionPrefix = []byte("bytom discovery")
153 versionPrefixSize = len(versionPrefix)
156 headSize = versionPrefixSize + nodeIDSize + sigSize // space of packet frame data
159 // Neighbors replies are sent across multiple packets to
160 // stay below the 1280 byte limit. We compute the maximum number
161 // of entries by stuffing a packet until it grows too large.
162 var maxNeighbors = func() int {
163 p := neighbors{Expiration: ^uint64(0)}
164 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
166 p.Nodes = append(p.Nodes, maxSizeNode)
169 b := new(bytes.Buffer)
170 wire.WriteJSON(p, b, &size, &err)
172 // If this ever happens, it will be caught by the unit tests.
173 panic("cannot encode: " + err.Error())
175 if headSize+size+1 >= 1280 {
181 var maxTopicNodes = func() int {
183 maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
185 p.Nodes = append(p.Nodes, maxSizeNode)
188 b := new(bytes.Buffer)
189 wire.WriteJSON(p, b, &size, &err)
191 // If this ever happens, it will be caught by the unit tests.
192 panic("cannot encode: " + err.Error())
194 if headSize+size+1 >= 1280 {
200 func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
205 return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
208 func (e1 rpcEndpoint) equal(e2 rpcEndpoint) bool {
209 return e1.UDP == e2.UDP && e1.TCP == e2.TCP && e1.IP.Equal(e2.IP)
212 func nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
213 if err := netutil.CheckRelayIP(sender.IP, rn.IP); err != nil {
216 n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
217 err := n.validateComplete()
221 func nodeToRPC(n *Node) rpcNode {
222 return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
225 type ingressPacket struct {
227 remoteAddr *net.UDPAddr
230 data interface{} // one of the RPC structs
234 type conn interface {
235 ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
236 WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
241 // udp implements the RPC protocol.
244 priv *crypto.PrivKeyEd25519
245 ourEndpoint rpcEndpoint
250 // ListenUDP returns a new table that listens for UDP packets on laddr.
251 func ListenUDP(priv *crypto.PrivKeyEd25519, conn conn, realaddr *net.UDPAddr, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) {
252 transport, err := listenUDP(priv, conn, realaddr)
256 net, err := newNetwork(transport, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), nodeDBPath, netrestrict)
260 log.Info("UDP listener up v5", "net", net.tab.self)
262 go transport.readLoop()
266 func listenUDP(priv *crypto.PrivKeyEd25519, conn conn, realaddr *net.UDPAddr) (*udp, error) {
267 return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(realaddr, uint16(realaddr.Port))}, nil
270 func (t *udp) localAddr() *net.UDPAddr {
271 return t.conn.LocalAddr().(*net.UDPAddr)
274 func (t *udp) Close() {
278 func (t *udp) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
279 hash, _ = t.sendPacket(remote.ID, remote.addr(), byte(ptype), data)
283 func (t *udp) sendPing(remote *Node, toaddr *net.UDPAddr, topics []Topic) (hash []byte) {
284 hash, _ = t.sendPacket(remote.ID, toaddr, byte(pingPacket), ping{
287 To: makeEndpoint(toaddr, uint16(toaddr.Port)), // TODO: maybe use known TCP port from DB
288 Expiration: uint64(time.Now().Add(expiration).Unix()),
294 func (t *udp) sendFindnode(remote *Node, target NodeID) {
295 t.sendPacket(remote.ID, remote.addr(), byte(findnodePacket), findnode{
297 Expiration: uint64(time.Now().Add(expiration).Unix()),
301 func (t *udp) sendNeighbours(remote *Node, results []*Node) {
302 // Send neighbors in chunks with at most maxNeighbors per packet
303 // to stay below the 1280 byte limit.
304 p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
305 for i, result := range results {
306 p.Nodes = append(p.Nodes, nodeToRPC(result))
307 if len(p.Nodes) == maxNeighbors || i == len(results)-1 {
308 t.sendPacket(remote.ID, remote.addr(), byte(neighborsPacket), p)
309 p.Nodes = p.Nodes[:0]
314 func (t *udp) sendFindnodeHash(remote *Node, target common.Hash) {
315 t.sendPacket(remote.ID, remote.addr(), byte(findnodeHashPacket), findnodeHash{
316 Target: common.Hash(target),
317 Expiration: uint64(time.Now().Add(expiration).Unix()),
321 func (t *udp) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
322 t.sendPacket(remote.ID, remote.addr(), byte(topicRegisterPacket), topicRegister{
329 func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
330 p := topicNodes{Echo: queryHash}
332 for _, result := range nodes {
333 if result.IP.Equal(t.net.tab.self.IP) || netutil.CheckRelayIP(remote.IP, result.IP) == nil {
334 p.Nodes = append(p.Nodes, nodeToRPC(result))
336 if len(p.Nodes) == maxTopicNodes {
337 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
338 p.Nodes = p.Nodes[:0]
342 if !sent || len(p.Nodes) > 0 {
343 t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
347 func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
348 //fmt.Println("sendPacket", nodeEvent(ptype), toaddr.String(), toid.String())
349 packet, hash, err := encodePacket(t.priv, ptype, req)
354 log.Debug(fmt.Sprintf(">>> %v to %x@%v", nodeEvent(ptype), toid[:8], toaddr))
355 if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
356 log.Info(fmt.Sprint("UDP send failed:", err))
361 // zeroed padding space for encodePacket.
362 var headSpace = make([]byte, headSize)
364 func encodePacket(priv *crypto.PrivKeyEd25519, ptype byte, req interface{}) (p, hash []byte, err error) {
365 b := new(bytes.Buffer)
369 wire.WriteJSON(req, b, &size, &err)
371 log.Error(fmt.Sprint("error encoding packet:", err))
375 nodeID := priv.PubKey().Unwrap().(crypto.PubKeyEd25519)
376 sig := priv.Sign(common.BytesToHash(packet[headSize:]).Bytes())
377 copy(packet, versionPrefix)
378 copy(packet[versionPrefixSize:], nodeID[:])
379 copy(packet[versionPrefixSize+nodeIDSize:], sig.Bytes())
381 hash = common.BytesToHash(packet[versionPrefixSize:]).Bytes()
382 return packet, hash, nil
385 // readLoop runs in its own goroutine. it injects ingress UDP packets
386 // into the network loop.
387 func (t *udp) readLoop() {
389 // Discovery packets are defined to be no larger than 1280 bytes.
390 // Packets larger than this size will be cut at the end and treated
391 // as invalid because their hash won't match.
392 buf := make([]byte, 1280)
394 nbytes, from, err := t.conn.ReadFromUDP(buf)
395 if netutil.IsTemporaryError(err) {
396 // Ignore temporary read errors.
397 log.Debug(fmt.Sprintf("Temporary read error: %v", err))
399 } else if err != nil {
400 // Shut down the loop for permament errors.
401 log.Debug(fmt.Sprintf("Read error: %v", err))
404 t.handlePacket(from, buf[:nbytes])
408 func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
409 pkt := ingressPacket{remoteAddr: from}
410 if err := decodePacket(buf, &pkt); err != nil {
411 log.Debug(fmt.Sprintf("Bad packet from %v: %v", from, err))
412 //fmt.Println("bad packet", err)
415 t.net.reqReadPacket(pkt)
419 func decodePacket(buffer []byte, pkt *ingressPacket) error {
420 if len(buffer) < headSize+1 {
421 return errPacketTooSmall
423 buf := make([]byte, len(buffer))
425 prefix, fromID, sigdata := buf[:versionPrefixSize], buf[versionPrefixSize:versionPrefixSize+nodeIDSize], buf[headSize:]
426 if !bytes.Equal(prefix, versionPrefix) {
430 pkt.hash = common.BytesToHash(buf[versionPrefixSize:]).Bytes()
431 pkt.remoteID = ByteID(fromID)
432 switch pkt.ev = nodeEvent(sigdata[0]); pkt.ev {
438 pkt.data = new(findnode)
439 case neighborsPacket:
440 pkt.data = new(neighbors)
441 case findnodeHashPacket:
442 pkt.data = new(findnodeHash)
443 case topicRegisterPacket:
444 pkt.data = new(topicRegister)
445 case topicQueryPacket:
446 pkt.data = new(topicQuery)
447 case topicNodesPacket:
448 pkt.data = new(topicNodes)
450 return fmt.Errorf("unknown packet type: %d", sigdata[0])
453 wire.ReadJSON(pkt.data, sigdata[1:], &err)
455 log.Error("wire readjson err:", err)