from layout_package import dump_render_tree_thread
from layout_package import json_layout_results_generator
+from layout_package import message_broker
from layout_package import printing
from layout_package import test_expectations
from layout_package import test_failures
from layout_package import test_results
from layout_package import test_results_uploader
-from test_types import image_diff
-from test_types import text_diff
-from test_types import test_type_base
from webkitpy.common.system import user
from webkitpy.thirdparty import simplejson
+from webkitpy.tool import grammar
import port
# Builder base URL where we have the archived test results.
BUILDER_BASE_URL = "http://build.chromium.org/buildbot/layout_test_results/"
+LAYOUT_TESTS_DIRECTORY = "LayoutTests" + os.sep
+
TestExpectationsFile = test_expectations.TestExpectationsFile
-class TestInfo:
+class TestInput:
"""Groups information about a test for easy passing of data."""
- def __init__(self, port, filename, timeout):
- """Generates the URI and stores the filename and timeout for this test.
+ def __init__(self, filename, timeout):
+ """Holds the input parameters for a test.
Args:
filename: Full path to the test.
- timeout: Timeout for running the test in TestShell.
+ timeout: Timeout in msecs the driver should use while running the test
"""
+ # FIXME: filename should really be test_name as a relative path.
self.filename = filename
- self._port = port
- self.uri = port.filename_to_uri(filename)
self.timeout = timeout
- self._image_checksum = -1
-
- def image_hash(self):
- # Read the image_hash lazily to reduce startup time.
- # This class is accessed across threads, but only one thread should
- # ever be dealing with any given TestInfo so no locking is needed.
- #
- # Note that we use -1 to indicate that we haven't read the value,
- # because expected_checksum() returns a string or None.
- if self._image_checksum == -1:
- self._image_checksum = self._port.expected_checksum(self.filename)
- return self._image_checksum
+ # The image_hash is used to avoid doing an image dump if the
+ # checksums match. The image_hash is set later, and only if it is needed
+ # for the test.
+ self.image_hash = None
class ResultSummary(object):
# in DumpRenderTree.
DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
- def __init__(self, port, options, printer):
+ def __init__(self, port, options, printer, message_broker):
"""Initialize test runner data structures.
Args:
port: an object implementing port-specific
options: a dictionary of command line options
printer: a Printer object to record updates to.
+ message_broker: object used to communicate with workers.
"""
self._port = port
self._options = options
self._printer = printer
+ self._message_broker = message_broker
# disable wss server. need to install pyOpenSSL on buildbots.
# self._websocket_secure_server = websocket_server.PyWebSocket(
# options.results_directory, use_tls=True, port=9323)
- # a list of TestType objects
- self._test_types = [text_diff.TestTextDiff]
- if options.pixel_tests:
- self._test_types.append(image_diff.ImageDiff)
-
# a set of test files, and the same tests as a list
self._test_files = set()
self._test_files_list = None
last_unexpected_results: list of unexpected results to retest, if any
"""
- paths = [arg for arg in args if arg and arg != '']
+ paths = [self._strip_test_dir_prefix(arg) for arg in args if arg and arg != '']
paths += last_unexpected_results
if self._options.test_list:
paths += read_test_files(self._options.test_list)
self._test_files = self._port.tests(paths)
+ def _strip_test_dir_prefix(self, path):
+ if path.startswith(LAYOUT_TESTS_DIRECTORY):
+ return path[len(LAYOUT_TESTS_DIRECTORY):]
+ return path
+
def lint(self):
# Creating the expecations for each platform/configuration pair does
# all the test list parsing and ensures it's correct syntax (e.g. no
# If we reached the end and we don't have enough tests, we run some
# from the beginning.
- if (self._options.run_chunk and
- (slice_end - slice_start < chunk_len)):
- extra = 1 + chunk_len - (slice_end - slice_start)
+ if slice_end - slice_start < chunk_len:
+ extra = chunk_len - (slice_end - slice_start)
extra_msg = (' last chunk is partial, appending [0:%d]' %
extra)
self._printer.print_expected(extra_msg)
def _get_dir_for_test_file(self, test_file):
"""Returns the highest-level directory by which to shard the given
test file."""
- index = test_file.rfind(os.sep + 'LayoutTests' + os.sep)
+ index = test_file.rfind(os.sep + LAYOUT_TESTS_DIRECTORY)
- test_file = test_file[index + len('LayoutTests/'):]
+ test_file = test_file[index + len(LAYOUT_TESTS_DIRECTORY):]
test_file_parts = test_file.split(os.sep, 1)
directory = test_file_parts[0]
test_file = test_file_parts[1]
return return_value
- def _get_test_info_for_file(self, test_file):
- """Returns the appropriate TestInfo object for the file. Mostly this
+ def _get_test_input_for_file(self, test_file):
+ """Returns the appropriate TestInput object for the file. Mostly this
is used for looking up the timeout value (in ms) to use for the given
test."""
- if self._expectations.has_modifier(test_file, test_expectations.SLOW):
- return TestInfo(self._port, test_file,
- self._options.slow_time_out_ms)
- return TestInfo(self._port, test_file, self._options.time_out_ms)
+ if self._test_is_slow(test_file):
+ return TestInput(test_file, self._options.slow_time_out_ms)
+ return TestInput(test_file, self._options.time_out_ms)
def _test_requires_lock(self, test_file):
"""Return True if the test needs to be locked when
split_path = test_file.split(os.sep)
return 'http' in split_path or 'websocket' in split_path
- def _get_test_file_queue(self, test_files):
- """Create the thread safe queue of lists of (test filenames, test URIs)
- tuples. Each TestShellThread pulls a list from this queue and runs
- those tests in order before grabbing the next available list.
+ def _test_is_slow(self, test_file):
+ return self._expectations.has_modifier(test_file,
+ test_expectations.SLOW)
- Shard the lists by directory. This helps ensure that tests that depend
- on each other (aka bad tests!) continue to run together as most
- cross-tests dependencies tend to occur within the same directory.
+ def _shard_tests(self, test_files, use_real_shards):
+ """Groups tests into batches.
+ This helps ensure that tests that depend on each other (aka bad tests!)
+ continue to run together as most cross-tests dependencies tend to
+ occur within the same directory. If use_real_shards is false, we
+ put each (non-HTTP/websocket) test into its own shard for maximum
+ concurrency instead of trying to do any sort of real sharding.
Return:
- The Queue of lists of TestInfo objects.
+ A list of lists of TestInput objects.
"""
+ # FIXME: when we added http locking, we changed how this works such
+ # that we always lump all of the HTTP threads into a single shard.
+ # That will slow down experimental-fully-parallel, but it's unclear
+ # what the best alternative is completely revamping how we track
+ # when to grab the lock.
test_lists = []
tests_to_http_lock = []
- if (self._options.experimental_fully_parallel or
- self._is_single_threaded()):
+ if not use_real_shards:
for test_file in test_files:
- test_info = self._get_test_info_for_file(test_file)
+ test_input = self._get_test_input_for_file(test_file)
if self._test_requires_lock(test_file):
- tests_to_http_lock.append(test_info)
+ tests_to_http_lock.append(test_input)
else:
- test_lists.append((".", [test_info]))
+ test_lists.append((".", [test_input]))
else:
tests_by_dir = {}
for test_file in test_files:
directory = self._get_dir_for_test_file(test_file)
- test_info = self._get_test_info_for_file(test_file)
+ test_input = self._get_test_input_for_file(test_file)
if self._test_requires_lock(test_file):
- tests_to_http_lock.append(test_info)
+ tests_to_http_lock.append(test_input)
else:
tests_by_dir.setdefault(directory, [])
- tests_by_dir[directory].append(test_info)
+ tests_by_dir[directory].append(test_input)
# Sort by the number of tests in the dir so that the ones with the
# most tests get run first in order to maximize parallelization.
# Number of tests is a good enough, but not perfect, approximation
tests_to_http_lock.reverse()
test_lists.insert(0, ("tests_to_http_lock", tests_to_http_lock))
- filename_queue = Queue.Queue()
- for item in test_lists:
- filename_queue.put(item)
- return filename_queue
-
- def _get_test_args(self, index):
- """Returns the tuple of arguments for tests and for DumpRenderTree."""
- test_args = test_type_base.TestArguments()
- test_args.png_path = None
- if self._options.pixel_tests:
- png_path = os.path.join(self._options.results_directory,
- "png_result%s.png" % index)
- test_args.png_path = png_path
- test_args.new_baseline = self._options.new_baseline
- test_args.reset_results = self._options.reset_results
-
- return test_args
+ return test_lists
def _contains_tests(self, subdir):
for test_file in self._test_files:
return True
return False
- def _instantiate_dump_render_tree_threads(self, test_files,
- result_summary):
- """Instantitates and starts the TestShellThread(s).
-
- Return:
- The list of threads.
- """
- filename_queue = self._get_test_file_queue(test_files)
-
- # Instantiate TestShellThreads and start them.
- threads = []
- for i in xrange(int(self._options.child_processes)):
- # Create separate TestTypes instances for each thread.
- test_types = []
- for test_type in self._test_types:
- test_types.append(test_type(self._port,
- self._options.results_directory))
-
- test_args = self._get_test_args(i)
- thread = dump_render_tree_thread.TestShellThread(self._port,
- self._options, filename_queue, self._result_queue,
- test_types, test_args)
- if self._is_single_threaded():
- thread.run_in_main_thread(self, result_summary)
- else:
- thread.start()
- threads.append(thread)
-
- return threads
-
- def _is_single_threaded(self):
- """Returns whether we should run all the tests in the main thread."""
- return int(self._options.child_processes) == 1
+ def _num_workers(self):
+ return int(self._options.child_processes)
def _run_tests(self, file_list, result_summary):
"""Runs the tests in the file_list.
in the form {filename:filename, test_run_time:test_run_time}
result_summary: summary object to populate with the results
"""
- # FIXME: We should use webkitpy.tool.grammar.pluralize here.
- plural = ""
- if not self._is_single_threaded():
- plural = "s"
- self._printer.print_update('Starting %s%s ...' %
- (self._port.driver_name(), plural))
- threads = self._instantiate_dump_render_tree_threads(file_list,
- result_summary)
+
+ self._printer.print_update('Sharding tests ...')
+ num_workers = self._num_workers()
+ test_lists = self._shard_tests(file_list,
+ num_workers > 1 and not self._options.experimental_fully_parallel)
+ filename_queue = Queue.Queue()
+ for item in test_lists:
+ filename_queue.put(item)
+
+ self._printer.print_update('Starting %s ...' %
+ grammar.pluralize('worker', num_workers))
+ message_broker = self._message_broker
+ self._current_filename_queue = filename_queue
+ self._current_result_summary = result_summary
+
+ if not self._options.dry_run:
+ threads = message_broker.start_workers(self)
+ else:
+ threads = {}
+
self._printer.print_update("Starting testing ...")
+ keyboard_interrupted = False
+ if not self._options.dry_run:
+ try:
+ message_broker.run_message_loop()
+ except KeyboardInterrupt:
+ _log.info("Interrupted, exiting")
+ message_broker.cancel_workers()
+ keyboard_interrupted = True
+ except:
+ # Unexpected exception; don't try to clean up workers.
+ _log.info("Exception raised, exiting")
+ raise
- keyboard_interrupted = self._wait_for_threads_to_finish(threads,
- result_summary)
- (thread_timings, test_timings, individual_test_timings) = \
+ thread_timings, test_timings, individual_test_timings = \
self._collect_timing_info(threads)
return (keyboard_interrupted, thread_timings, test_timings,
individual_test_timings)
- def _wait_for_threads_to_finish(self, threads, result_summary):
- keyboard_interrupted = False
- try:
- # Loop through all the threads waiting for them to finish.
- some_thread_is_alive = True
- while some_thread_is_alive:
- some_thread_is_alive = False
- t = time.time()
- for thread in threads:
- exception_info = thread.exception_info()
- if exception_info is not None:
- # Re-raise the thread's exception here to make it
- # clear that testing was aborted. Otherwise,
- # the tests that did not run would be assumed
- # to have passed.
- raise exception_info[0], exception_info[1], exception_info[2]
-
- if thread.isAlive():
- some_thread_is_alive = True
- next_timeout = thread.next_timeout()
- if (next_timeout and t > next_timeout):
- _log_wedged_thread(thread)
- thread.clear_next_timeout()
-
- self.update_summary(result_summary)
-
- if some_thread_is_alive:
- time.sleep(0.01)
-
- except KeyboardInterrupt:
- keyboard_interrupted = True
- for thread in threads:
- thread.cancel()
-
- return keyboard_interrupted
+ def update(self):
+ self.update_summary(self._current_result_summary)
def _collect_timing_info(self, threads):
test_timings = {}
if not result_summary:
return None
- # Do not start when http locking is enabled.
- if not self._options.wait_for_httpd:
- if self.needs_http():
- self._printer.print_update('Starting HTTP server ...')
- self._port.start_http_server()
-
- if self.needs_websocket():
- self._printer.print_update('Starting WebSocket server ...')
- self._port.start_websocket_server()
- # self._websocket_secure_server.Start()
-
return result_summary
def run(self, result_summary):
self._expectations, result_summary, retry_summary)
self._printer.print_unexpected_results(unexpected_results)
- if self._options.record_results:
- # Write the same data to log files.
- self._write_json_files(unexpected_results, result_summary,
- individual_test_timings)
-
- # Upload generated JSON files to appengine server.
- self._upload_json_files()
+ if (self._options.record_results and not self._options.dry_run and
+ not keyboard_interrupted):
+ # Write the same data to log files and upload generated JSON files
+ # to appengine server.
+ self._upload_json_files(unexpected_results, result_summary,
+ individual_test_timings)
# Write the summary to disk (results.html) and display it if requested.
- wrote_results = self._write_results_html_file(result_summary)
- if self._options.show_results and wrote_results:
- self._show_results_html_file()
+ if not self._options.dry_run:
+ wrote_results = self._write_results_html_file(result_summary)
+ if self._options.show_results and wrote_results:
+ self._show_results_html_file()
# Now that we've completed all the processing we can, we re-raise
# a KeyboardInterrupt if necessary so the caller can handle it.
sys.stdout.flush()
_log.debug("flushing stderr")
sys.stderr.flush()
- if not self._options.wait_for_httpd:
- _log.debug("stopping http server")
- self._port.stop_http_server()
- _log.debug("stopping websocket server")
- self._port.stop_websocket_server()
_log.debug("stopping helper")
self._port.stop_helper()
return failed_results
- def _write_json_files(self, unexpected_results, result_summary,
+ def _upload_json_files(self, unexpected_results, result_summary,
individual_test_timings):
"""Writes the results of the test run as JSON files into the results
- dir.
+ dir and upload the files to the appengine server.
There are three different files written into the results dir:
unexpected_results.json: A short list of any unexpected results.
with codecs.open(expectations_path, "w", "utf-8") as file:
file.write(u"ADD_EXPECTATIONS(%s);" % expectations_json)
- json_layout_results_generator.JSONLayoutResultsGenerator(
+ generator = json_layout_results_generator.JSONLayoutResultsGenerator(
self._port, self._options.builder_name, self._options.build_name,
self._options.build_number, self._options.results_directory,
BUILDER_BASE_URL, individual_test_timings,
self._expectations, result_summary, self._test_files_list,
not self._options.upload_full_results,
- self._options.test_results_server)
+ self._options.test_results_server,
+ "layout-tests",
+ self._options.master_name)
_log.debug("Finished writing JSON files.")
- def _upload_json_files(self):
- if not self._options.test_results_server:
- return
-
- _log.info("Uploading JSON files for builder: %s",
- self._options.builder_name)
-
- attrs = [("builder", self._options.builder_name), ("testtype", "layout-tests")]
- # FIXME: master_name should be required if test_results_server is set.
- # Throw an error if master_name isn't set.
- if self._options.master_name:
- attrs.append(("master", self._options.master_name))
-
json_files = ["expectations.json"]
if self._options.upload_full_results:
json_files.append("results.json")
else:
json_files.append("incremental_results.json")
- files = [(file, os.path.join(self._options.results_directory, file))
- for file in json_files]
-
- # FIXME: Remove this. This is temporary debug logging.
- if self._options.builder_name.startswith("Webkit Linux"):
- for filename in files:
- _log.debug(filename[1])
- with codecs.open(filename[1], "r") as results_file:
- _log.debug("%s:\n%s" % (filename[0], results_file.read()))
-
- uploader = test_results_uploader.TestResultsUploader(
- self._options.test_results_server)
- try:
- # Set uploading timeout in case appengine server is having problem.
- # 120 seconds are more than enough to upload test results.
- uploader.upload(attrs, files, 120)
- except Exception, err:
- _log.error("Upload failed: %s" % err)
- return
-
- _log.info("JSON files uploaded.")
+ generator.upload_json_files(json_files)
def _print_config(self):
"""Prints the configuration for the test run."""
(self._options.time_out_ms,
self._options.slow_time_out_ms))
- if self._is_single_threaded():
+ if self._num_workers() == 1:
p.print_config("Running one %s" % self._port.driver_name())
else:
p.print_config("Running %s %ss in parallel" %
(self._options.child_processes,
self._port.driver_name()))
+ p.print_config('Command line: ' +
+ ' '.join(self._port.driver_cmd_line()))
+ p.print_config("Worker model: %s" % self._options.worker_model)
p.print_config("")
def _print_expected_results_of_type(self, result_summary,
for test_tuple in individual_test_timings:
filename = test_tuple.filename
is_timeout_crash_or_slow = False
- if self._expectations.has_modifier(filename,
- test_expectations.SLOW):
+ if self._test_is_slow(filename):
is_timeout_crash_or_slow = True
slow_tests.append(test_tuple)
printer.cleanup()
return 0
+ broker = message_broker.get(port, options)
+
# We wrap any parts of the run that are slow or likely to raise exceptions
# in a try/finally to ensure that we clean up the logging configuration.
num_unexpected_results = -1
try:
- test_runner = TestRunner(port, options, printer)
+ test_runner = TestRunner(port, options, printer, broker)
test_runner._print_config()
printer.print_update("Collecting tests ...")
_log.debug("Testing completed, Exit status: %d" %
num_unexpected_results)
finally:
+ broker.cleanup()
printer.cleanup()
return num_unexpected_results
def _set_up_derived_options(port_obj, options):
"""Sets the options values that depend on other options values."""
+ if options.worker_model == 'inline':
+ if options.child_processes and int(options.child_processes) > 1:
+ _log.warning("--worker-model=inline overrides --child-processes")
+ options.child_processes = "1"
if not options.child_processes:
- # FIXME: Investigate perf/flakiness impact of using cpu_count + 1.
options.child_processes = os.environ.get("WEBKIT_TEST_CHILD_PROCESSES",
str(port_obj.default_child_processes()))
default=False,
help="Don't check the system dependencies (themes)"),
optparse.make_option("--use-drt", action="store_true",
- default=False,
+ default=None,
help="Use DumpRenderTree instead of test_shell"),
optparse.make_option("--accelerated-compositing",
action="store_true",
optparse.make_option("--no-record-results", action="store_false",
default=True, dest="record_results",
help="Don't record the results."),
- optparse.make_option("--wait-for-httpd", action="store_true",
- default=False, dest="wait_for_httpd",
- help="Wait for http locks."),
# old-run-webkit-tests also has HTTP toggle options:
# --[no-]http Run (or do not run) http tests
# (default: run)
optparse.make_option("--no-build", dest="build",
action="store_false", help="Don't check to see if the "
"DumpRenderTree build is up-to-date."),
+ optparse.make_option("-n", "--dry-run", action="store_true",
+ default=False,
+ help="Do everything but actually run the tests or upload results."),
# old-run-webkit-tests has --valgrind instead of wrapper.
optparse.make_option("--wrapper",
help="wrapper command to insert before invocations of "
optparse.make_option("--child-processes",
help="Number of DumpRenderTrees to run in parallel."),
# FIXME: Display default number of child processes that will run.
+ optparse.make_option("--worker-model", action="store",
+ default="threads", help=("controls worker model. Valid values are "
+ "'inline' and 'threads' (default).")),
optparse.make_option("--experimental-fully-parallel",
action="store_true", default=False,
help="run all tests in parallel"),
# Number of times to run the set of tests (e.g. ABCABCABC)
optparse.make_option("--print-last-failures", action="store_true",
default=False, help="Print the tests in the last run that "
- "had unexpected failures (or passes)."),
+ "had unexpected failures (or passes) and then exit."),
optparse.make_option("--retest-last-failures", action="store_true",
default=False, help="re-test the tests in the last run that "
"had unexpected failures (or passes)."),
old_run_webkit_tests_compat)
option_parser = optparse.OptionParser(option_list=option_list)
- options, args = option_parser.parse_args(args)
-
- return options, args
-
-
-def _log_wedged_thread(thread):
- """Log information about the given thread state."""
- id = thread.id()
- stack = dump_render_tree_thread.find_thread_stack(id)
- assert(stack is not None)
- _log.error("")
- _log.error("thread %s (%d) is wedged" % (thread.getName(), id))
- dump_render_tree_thread.log_stack(stack)
- _log.error("")
+ return option_parser.parse_args(args)
def main():
port_obj = port.get(options.platform, options)
return run(port_obj, options, args)
+
if '__main__' == __name__:
try:
sys.exit(main())