TCP_CLOSING = 11
TCP_NEW_SYN_RECV = 12
+ALL_NON_TIME_WAIT = 0xffffffff & ~(1 << TCP_TIME_WAIT)
+
class SockDiag(netlink.NetlinkSocket):
out = self._Dump(SOCK_DIAG_BY_FAMILY, diag_req, InetDiagMsg, "")
return out
- def DumpSockets(self, family, protocol, ext, states, sock_id):
- """Dumps sockets matching the specified parameters."""
+ def DumpAllInetSockets(self, protocol, sock_id=None, ext=0,
+ states=ALL_NON_TIME_WAIT):
+ """Dumps IPv4 or IPv6 sockets matching the specified parameters."""
+ # DumpSockets(AF_UNSPEC) does not result in dumping all inet sockets, it
+ # results in ENOENT.
if sock_id is None:
sock_id = self._EmptyInetDiagSockId()
- diag_req = InetDiagReqV2((family, protocol, ext, states, sock_id))
- return self.Dump(diag_req)
-
- def DumpAllInetSockets(self, protocol, sock_id=None, ext=0, states=0xffffffff):
- # DumpSockets(AF_UNSPEC) does not result in dumping all inet sockets, it
- # results in ENOENT.
sockets = []
for family in [AF_INET, AF_INET6]:
- sockets += self.DumpSockets(family, protocol, ext, states, None)
+ diag_req = InetDiagReqV2((family, protocol, ext, states, sock_id))
+ sockets += self.Dump(diag_req)
+
return sockets
@staticmethod
if __name__ == "__main__":
n = SockDiag()
n.DEBUG = True
+ bytecode = ""
sock_id = n._EmptyInetDiagSockId()
sock_id.dport = 443
- family = AF_INET6
- protocol = IPPROTO_TCP
- ext = 0
- states = 0xffffffff
ext = 1 << (INET_DIAG_TOS - 1) | 1 << (INET_DIAG_TCLASS - 1)
- diag_msgs = n.DumpSockets(family, protocol, ext, states, sock_id)
+ states = 0xffffffff
+ diag_msgs = n.DumpAllInetSockets(IPPROTO_TCP,
+ sock_id=sock_id, ext=ext, states=states)
print diag_msgs
NUM_SOCKETS = 100
-ALL_NON_TIME_WAIT = 0xffffffff & ~(1 << sock_diag.TCP_TIME_WAIT)
-
# TODO: Backport SOCK_DESTROY and delete this.
HAVE_SOCK_DESTROY = net_test.LINUX_VERSION >= (4, 4)
def testFindsAllMySockets(self):
self.socketpairs = self._CreateLotsOfSockets()
- sockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP,
- states=ALL_NON_TIME_WAIT)
+ sockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP)
self.assertGreaterEqual(len(sockets), NUM_SOCKETS)
# Find the cookies for all of our sockets.