NO_DEBUG_INFO_TESTCASE = True
- _TIMEOUT_SECONDS = 120 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
- _DEFAULT_TIMEOUT = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
- _READ_TIMEOUT = 5 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
- _WAIT_TIMEOUT = 5 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
+ # Default time out in seconds. The timeout is increased tenfold under Asan.
+ DEFAULT_TIMEOUT = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
+ # Default sleep time in seconds. The sleep time is doubled under Asan.
+ DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1)
_GDBREMOTE_KILL_PACKET = "$k#6b"
return (named_pipe_path, named_pipe, named_pipe_fd)
- def get_stub_port_from_named_socket(self, read_timeout_seconds):
+ def get_stub_port_from_named_socket(self):
# Wait for something to read with a max timeout.
(ready_readers, _, _) = select.select(
- [self.named_pipe_fd], [], [], read_timeout_seconds)
+ [self.named_pipe_fd], [], [], self.DEFAULT_TIMEOUT)
self.assertIsNotNone(
ready_readers,
"write side of pipe has not written anything - stub isn't writing to pipe.")
# If we're receiving the stub's listening port from the named pipe, do
# that here.
if self.named_pipe:
- self.port = self.get_stub_port_from_named_socket(self._READ_TIMEOUT)
+ self.port = self.get_stub_port_from_named_socket()
return server
def expect_socket_recv(
self,
sock,
- expected_content_regex,
- timeout_seconds):
+ expected_content_regex
+ ):
response = ""
- timeout_time = time.time() + timeout_seconds
+ timeout_time = time.time() + self.DEFAULT_TIMEOUT
while not expected_content_regex.match(
response) and time.time() < timeout_time:
- can_read, _, _ = select.select([sock], [], [], timeout_seconds)
+ can_read, _, _ = select.select([sock], [], [], self.DEFAULT_TIMEOUT)
if can_read and sock in can_read:
recv_bytes = sock.recv(4096)
if recv_bytes:
self.assertTrue(expected_content_regex.match(response))
- def expect_socket_send(self, sock, content, timeout_seconds):
+ def expect_socket_send(self, sock, content):
request_bytes_remaining = content
- timeout_time = time.time() + timeout_seconds
+ timeout_time = time.time() + self.DEFAULT_TIMEOUT
while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
- _, can_write, _ = select.select([], [sock], [], timeout_seconds)
+ _, can_write, _ = select.select([], [sock], [], self.DEFAULT_TIMEOUT)
if can_write and sock in can_write:
written_byte_count = sock.send(request_bytes_remaining.encode())
request_bytes_remaining = request_bytes_remaining[
written_byte_count:]
self.assertEqual(len(request_bytes_remaining), 0)
- def do_handshake(self, stub_socket, timeout_seconds=None):
- if not timeout_seconds:
- timeout_seconds = self._WAIT_TIMEOUT
-
+ def do_handshake(self, stub_socket):
# Write the ack.
- self.expect_socket_send(stub_socket, "+", timeout_seconds)
+ self.expect_socket_send(stub_socket, "+")
# Send the start no ack mode packet.
NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
# Receive the ack and "OK"
self.expect_socket_recv(stub_socket, re.compile(
- r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
+ r"^\+\$OK#[0-9a-fA-F]{2}$"))
# Send the final ack.
- self.expect_socket_send(stub_socket, "+", timeout_seconds)
+ self.expect_socket_send(stub_socket, "+")
def add_no_ack_remote_stream(self):
self.test_sequence.add_log_lines(
return [parse_reg_info_response(reg_info_response)
for reg_info_response in reg_info_responses]
- def expect_gdbremote_sequence(self, timeout_seconds=None):
- if not timeout_seconds:
- timeout_seconds = self._TIMEOUT_SECONDS
+ def expect_gdbremote_sequence(self):
return expect_lldb_gdbserver_replay(
self,
self.sock,
self.test_sequence,
self._pump_queues,
- timeout_seconds,
+ self.DEFAULT_TIMEOUT,
self.logger)
_KNOWN_REGINFO_KEYS = [
thread_ids.extend(new_thread_infos)
return thread_ids
- def wait_for_thread_count(self, thread_count, timeout_seconds=None):
- if not timeout_seconds:
- timeout_seconds = self._WAIT_TIMEOUT
+ def wait_for_thread_count(self, thread_count):
start_time = time.time()
- timeout_time = start_time + timeout_seconds
+ timeout_time = start_time + self.DEFAULT_TIMEOUT
actual_thread_count = 0
while actual_thread_count < thread_count:
if time.time() > timeout_time:
raise Exception(
'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
- timeout_seconds, thread_count, actual_thread_count))
+ self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
return threads
self.run_process_then_stop(run_seconds=1)
# Wait at most x seconds for 3 threads to be present.
- threads = self.wait_for_thread_count(3, timeout_seconds=self._WAIT_TIMEOUT)
+ threads = self.wait_for_thread_count(3)
self.assertEqual(len(threads), 3)
# verify we can $H to each thead, and $qC matches the thread we set.
# context = self.run_process_then_stop(run_seconds=1)
# Wait at most x seconds for all threads to be present.
- # threads = self.wait_for_thread_count(NUM_THREADS, timeout_seconds=5)
+ # threads = self.wait_for_thread_count(NUM_THREADS)
# self.assertEquals(len(threads), NUM_THREADS)
signaled_tids = {}
2: "thread_id"}}],
True)
- context = self.expect_gdbremote_sequence(timeout_seconds=self._DEFAULT_TIMEOUT)
+ context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
signo = context.get("signo")
self.assertEqual(int(signo, 16), segfault_signo)
True)
# Run the sequence.
- context = self.expect_gdbremote_sequence(
- timeout_seconds=self._DEFAULT_TIMEOUT)
+ context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Ensure the stop signal is the signal we delivered.
self.assertIsNotNone(context)
# Wait for 3 threads to be present.
- threads = self.wait_for_thread_count(3, timeout_seconds=self._WAIT_TIMEOUT)
+ threads = self.wait_for_thread_count(3)
self.assertEqual(len(threads), 3)
expected_reg_values = []