OSDN Git Service

ANRdaemon: move trace result from /sdcard to /data am: d93aa41807
[android-x86/system-extras.git] / tests / net_test / net_test.py
index a87b71b..d7ea013 100755 (executable)
@@ -16,6 +16,8 @@
 
 import fcntl
 import os
+import random
+import re
 from socket import *  # pylint: disable=wildcard-import
 import struct
 import unittest
@@ -28,11 +30,14 @@ IPV6_RECVERR = 25
 IP_TRANSPARENT = 19
 IPV6_TRANSPARENT = 75
 IPV6_TCLASS = 67
-SO_BINDTODEVICE = 25
-SO_MARK = 36
 IPV6_FLOWLABEL_MGR = 32
 IPV6_FLOWINFO_SEND = 33
 
+SO_BINDTODEVICE = 25
+SO_MARK = 36
+SO_PROTOCOL = 38
+SO_DOMAIN = 39
+
 ETH_P_IP = 0x0800
 ETH_P_IPV6 = 0x86dd
 
@@ -51,6 +56,8 @@ IPV6_FL_S_NONE = 0
 IPV6_FL_S_EXCL = 1
 IPV6_FL_S_ANY = 255
 
+IFNAMSIZ = 16
+
 IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03"
 IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03"
 
@@ -63,6 +70,12 @@ IPV6_SEQ_DGRAM_HEADER = ("  sl  "
                          "st tx_queue rx_queue tr tm->when retrnsmt"
                          "   uid  timeout inode ref pointer drops\n")
 
+# Arbitrary packet payload.
+UDP_PAYLOAD = str(scapy.DNS(rd=1,
+                            id=random.randint(0, 65535),
+                            qd=scapy.DNSQR(qname="wWW.GoOGle.CoM",
+                                           qtype="AAAA")))
+
 # Unix group to use if we want to open sockets as non-root.
 AID_INET = 3003
 
@@ -140,11 +153,29 @@ def RawGRESocket(family):
   return s
 
 
+def DisableLinger(sock):
+  sock.setsockopt(SOL_SOCKET, SO_LINGER, struct.pack("ii", 1, 0))
+
+
+def CreateSocketPair(family, socktype, addr):
+  clientsock = socket(family, socktype, 0)
+  listensock = socket(family, socktype, 0)
+  listensock.bind((addr, 0))
+  addr = listensock.getsockname()
+  listensock.listen(1)
+  clientsock.connect(addr)
+  acceptedsock, _ = listensock.accept()
+  DisableLinger(clientsock)
+  DisableLinger(acceptedsock)
+  listensock.close()
+  return clientsock, acceptedsock
+
+
 def GetInterfaceIndex(ifname):
   s = IPv4PingSocket()
-  ifr = struct.pack("16si", ifname, 0)
+  ifr = struct.pack("%dsi" % IFNAMSIZ, ifname, 0)
   ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
-  return struct.unpack("16si", ifr)[1]
+  return struct.unpack("%dsi" % IFNAMSIZ, ifr)[1]
 
 
 def SetInterfaceHWAddr(ifname, hwaddr):
@@ -153,20 +184,20 @@ def SetInterfaceHWAddr(ifname, hwaddr):
   hwaddr = hwaddr.decode("hex")
   if len(hwaddr) != 6:
     raise ValueError("Unknown hardware address length %d" % len(hwaddr))
-  ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr)
+  ifr = struct.pack("%dsH6s" % IFNAMSIZ, ifname, scapy.ARPHDR_ETHER, hwaddr)
   fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
 
 
 def SetInterfaceState(ifname, up):
   s = IPv4PingSocket()
-  ifr = struct.pack("16sH", ifname, 0)
+  ifr = struct.pack("%dsH" % IFNAMSIZ, ifname, 0)
   ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
-  _, flags = struct.unpack("16sH", ifr)
+  _, flags = struct.unpack("%dsH" % IFNAMSIZ, ifr)
   if up:
     flags |= scapy.IFF_UP
   else:
     flags &= ~scapy.IFF_UP
-  ifr = struct.pack("16sH", ifname, flags)
+  ifr = struct.pack("%dsH" % IFNAMSIZ, ifname, flags)
   ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
 
 
@@ -285,7 +316,6 @@ except ValueError:
 
 
 class RunAsUid(object):
-
   """Context guard to run a code block as a given UID."""
 
   def __init__(self, uid):
@@ -311,6 +341,54 @@ class NetworkTest(unittest.TestCase):
     msg = os.strerror(err_num)
     self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
 
+  def ReadProcNetSocket(self, protocol):
+    # Read file.
+    filename = "/proc/net/%s" % protocol
+    lines = open(filename).readlines()
+
+    # Possibly check, and strip, header.
+    if protocol in ["icmp6", "raw6", "udp6"]:
+      self.assertEqual(IPV6_SEQ_DGRAM_HEADER, lines[0])
+    lines = lines[1:]
+
+    # Check contents.
+    if protocol.endswith("6"):
+      addrlen = 32
+    else:
+      addrlen = 8
+
+    if protocol.startswith("tcp"):
+      # Real sockets have 5 extra numbers, timewait sockets have none.
+      end_regexp = "(| +[0-9]+ [0-9]+ [0-9]+ [0-9]+ -?[0-9]+|)$"
+    elif re.match("icmp|udp|raw", protocol):
+      # Drops.
+      end_regexp = " +([0-9]+) *$"
+    else:
+      raise ValueError("Don't know how to parse %s" % filename)
+
+    regexp = re.compile(r" *(\d+): "                    # bucket
+                        "([0-9A-F]{%d}:[0-9A-F]{4}) "   # srcaddr, port
+                        "([0-9A-F]{%d}:[0-9A-F]{4}) "   # dstaddr, port
+                        "([0-9A-F][0-9A-F]) "           # state
+                        "([0-9A-F]{8}:[0-9A-F]{8}) "    # mem
+                        "([0-9A-F]{2}:[0-9A-F]{8}) "    # ?
+                        "([0-9A-F]{8}) +"               # ?
+                        "([0-9]+) +"                    # uid
+                        "([0-9]+) +"                    # timeout
+                        "([0-9]+) +"                    # inode
+                        "([0-9]+) +"                    # refcnt
+                        "([0-9a-f]+)"                   # sp
+                        "%s"                            # icmp has spaces
+                        % (addrlen, addrlen, end_regexp))
+    # Return a list of lists with only source / dest addresses for now.
+    # TODO: consider returning a dict or namedtuple instead.
+    out = []
+    for line in lines:
+      (_, src, dst, state, mem,
+       _, _, uid, _, _, refcnt, _, extra) = regexp.match(line).groups()
+      out.append([src, dst, state, mem, uid, refcnt, extra])
+    return out
+
 
 if __name__ == "__main__":
   unittest.main()