import fcntl
import os
import random
+import re
from socket import * # pylint: disable=wildcard-import
import struct
import unittest
msg = os.strerror(err_num)
self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
+ def ReadProcNetSocket(self, protocol):
+ # Read file.
+ filename = "/proc/net/%s" % protocol
+ lines = open(filename).readlines()
+
+ # Possibly check, and strip, header.
+ if protocol in ["icmp6", "raw6", "udp6"]:
+ self.assertEqual(IPV6_SEQ_DGRAM_HEADER, lines[0])
+ lines = lines[1:]
+
+ # Check contents.
+ if protocol.endswith("6"):
+ addrlen = 32
+ else:
+ addrlen = 8
+
+ if protocol.startswith("tcp"):
+ # Real sockets have 5 extra numbers, timewait sockets have none.
+ end_regexp = "(| +[0-9]+ [0-9]+ [0-9]+ [0-9]+ -?[0-9]+|)$"
+ elif re.match("icmp|udp|raw", protocol):
+ # Drops.
+ end_regexp = " +([0-9]+) *$"
+ else:
+ raise ValueError("Don't know how to parse %s" % filename)
+
+ regexp = re.compile(r" *(\d+): " # bucket
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
+ "([0-9A-F][0-9A-F]) " # state
+ "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
+ "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
+ "([0-9A-F]{8}) +" # ?
+ "([0-9]+) +" # uid
+ "([0-9]+) +" # timeout
+ "([0-9]+) +" # inode
+ "([0-9]+) +" # refcnt
+ "([0-9a-f]+)" # sp
+ "%s" # icmp has spaces
+ % (addrlen, addrlen, end_regexp))
+ # Return a list of lists with only source / dest addresses for now.
+ # TODO: consider returning a dict or namedtuple instead.
+ out = []
+ for line in lines:
+ (_, src, dst, state, mem,
+ _, _, uid, _, _, refcnt, _, extra) = regexp.match(line).groups()
+ out.append([src, dst, state, mem, uid, refcnt, extra])
+ return out
+
if __name__ == "__main__":
unittest.main()
import os
import posix
import random
-import re
from socket import * # pylint: disable=wildcard-import
import threading
import time
self.assertEqual(len(data), len(rcvd))
self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
- def ReadProcNetSocket(self, protocol):
- # Read file.
- lines = open("/proc/net/%s" % protocol).readlines()
-
- # Possibly check, and strip, header.
- if protocol in ["icmp6", "raw6", "udp6"]:
- self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
- lines = lines[1:]
-
- # Check contents.
- if protocol.endswith("6"):
- addrlen = 32
- else:
- addrlen = 8
- regexp = re.compile(r" *(\d+): " # bucket
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
- "([0-9A-F][0-9A-F]) " # state
- "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
- "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
- "([0-9A-F]{8}) +" # ?
- "([0-9]+) +" # uid
- "([0-9]+) +" # ?
- "([0-9]+) +" # inode
- "([0-9]+) +" # refcnt
- "([0-9a-f]+) +" # sp
- "([0-9]+) *$" # drops, icmp has spaces
- % (addrlen, addrlen))
- # Return a list of lists with only source / dest addresses for now.
- out = []
- for line in lines:
- (_, src, dst, state, mem,
- _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
- out.append([src, dst, state, mem, uid, refcnt, drops])
- return out
-
def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
txmem=0, rxmem=0):
expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),