import fcntl
import os
+import random
+import re
from socket import * # pylint: disable=wildcard-import
import struct
import unittest
IP_TRANSPARENT = 19
IPV6_TRANSPARENT = 75
IPV6_TCLASS = 67
-SO_BINDTODEVICE = 25
-SO_MARK = 36
IPV6_FLOWLABEL_MGR = 32
IPV6_FLOWINFO_SEND = 33
+SO_BINDTODEVICE = 25
+SO_MARK = 36
+SO_PROTOCOL = 38
+SO_DOMAIN = 39
+
ETH_P_IP = 0x0800
ETH_P_IPV6 = 0x86dd
IPV6_FL_S_EXCL = 1
IPV6_FL_S_ANY = 255
+IFNAMSIZ = 16
+
IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03"
IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03"
"st tx_queue rx_queue tr tm->when retrnsmt"
" uid timeout inode ref pointer drops\n")
+# Arbitrary packet payload.
+UDP_PAYLOAD = str(scapy.DNS(rd=1,
+ id=random.randint(0, 65535),
+ qd=scapy.DNSQR(qname="wWW.GoOGle.CoM",
+ qtype="AAAA")))
+
# Unix group to use if we want to open sockets as non-root.
AID_INET = 3003
return s
+def DisableLinger(sock):
+ sock.setsockopt(SOL_SOCKET, SO_LINGER, struct.pack("ii", 1, 0))
+
+
+def CreateSocketPair(family, socktype, addr):
+ clientsock = socket(family, socktype, 0)
+ listensock = socket(family, socktype, 0)
+ listensock.bind((addr, 0))
+ addr = listensock.getsockname()
+ listensock.listen(1)
+ clientsock.connect(addr)
+ acceptedsock, _ = listensock.accept()
+ DisableLinger(clientsock)
+ DisableLinger(acceptedsock)
+ listensock.close()
+ return clientsock, acceptedsock
+
+
def GetInterfaceIndex(ifname):
s = IPv4PingSocket()
- ifr = struct.pack("16si", ifname, 0)
+ ifr = struct.pack("%dsi" % IFNAMSIZ, ifname, 0)
ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
- return struct.unpack("16si", ifr)[1]
+ return struct.unpack("%dsi" % IFNAMSIZ, ifr)[1]
def SetInterfaceHWAddr(ifname, hwaddr):
hwaddr = hwaddr.decode("hex")
if len(hwaddr) != 6:
raise ValueError("Unknown hardware address length %d" % len(hwaddr))
- ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr)
+ ifr = struct.pack("%dsH6s" % IFNAMSIZ, ifname, scapy.ARPHDR_ETHER, hwaddr)
fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
def SetInterfaceState(ifname, up):
s = IPv4PingSocket()
- ifr = struct.pack("16sH", ifname, 0)
+ ifr = struct.pack("%dsH" % IFNAMSIZ, ifname, 0)
ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
- _, flags = struct.unpack("16sH", ifr)
+ _, flags = struct.unpack("%dsH" % IFNAMSIZ, ifr)
if up:
flags |= scapy.IFF_UP
else:
flags &= ~scapy.IFF_UP
- ifr = struct.pack("16sH", ifname, flags)
+ ifr = struct.pack("%dsH" % IFNAMSIZ, ifname, flags)
ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
class RunAsUid(object):
-
"""Context guard to run a code block as a given UID."""
def __init__(self, uid):
msg = os.strerror(err_num)
self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
+ def ReadProcNetSocket(self, protocol):
+ # Read file.
+ filename = "/proc/net/%s" % protocol
+ lines = open(filename).readlines()
+
+ # Possibly check, and strip, header.
+ if protocol in ["icmp6", "raw6", "udp6"]:
+ self.assertEqual(IPV6_SEQ_DGRAM_HEADER, lines[0])
+ lines = lines[1:]
+
+ # Check contents.
+ if protocol.endswith("6"):
+ addrlen = 32
+ else:
+ addrlen = 8
+
+ if protocol.startswith("tcp"):
+ # Real sockets have 5 extra numbers, timewait sockets have none.
+ end_regexp = "(| +[0-9]+ [0-9]+ [0-9]+ [0-9]+ -?[0-9]+|)$"
+ elif re.match("icmp|udp|raw", protocol):
+ # Drops.
+ end_regexp = " +([0-9]+) *$"
+ else:
+ raise ValueError("Don't know how to parse %s" % filename)
+
+ regexp = re.compile(r" *(\d+): " # bucket
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
+ "([0-9A-F][0-9A-F]) " # state
+ "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
+ "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
+ "([0-9A-F]{8}) +" # ?
+ "([0-9]+) +" # uid
+ "([0-9]+) +" # timeout
+ "([0-9]+) +" # inode
+ "([0-9]+) +" # refcnt
+ "([0-9a-f]+)" # sp
+ "%s" # icmp has spaces
+ % (addrlen, addrlen, end_regexp))
+ # Return a list of lists with only source / dest addresses for now.
+ # TODO: consider returning a dict or namedtuple instead.
+ out = []
+ for line in lines:
+ (_, src, dst, state, mem,
+ _, _, uid, _, _, refcnt, _, extra) = regexp.match(line).groups()
+ out.append([src, dst, state, mem, uid, refcnt, extra])
+ return out
+
if __name__ == "__main__":
unittest.main()