From 0631be28f6593aa9cd5916b6e482bf4ca4d97f34 Mon Sep 17 00:00:00 2001 From: Julian Lettner Date: Sat, 16 Feb 2019 00:40:40 +0000 Subject: [PATCH] [lit][NFC] Cleanup lit worker process handling Move code that is executed on worker process to separate file. This makes the use of the pickled arguments stored in global variables in the worker a bit clearer. (Still not pretty though.) Extract handling of parallelism groups to it's own function. Use BoundedSemaphore instead of Semaphore. BoundedSemaphore raises for unmatched release() calls. Cleanup imports. Differential Revision: https://reviews.llvm.org/D58196 git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@354187 91177308-0d34-0410-b5e6-96231b3b80d8 --- utils/lit/lit/run.py | 119 ++++++------------------------------------------ utils/lit/lit/util.py | 14 ++++++ utils/lit/lit/worker.py | 82 +++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 106 deletions(-) create mode 100644 utils/lit/lit/worker.py diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py index a6de83eb3c2..5483f7ffecb 100644 --- a/utils/lit/lit/run.py +++ b/utils/lit/lit/run.py @@ -1,28 +1,9 @@ -import os -import sys -import threading +import multiprocessing import time -import traceback -try: - import Queue as queue -except ImportError: - import queue - -try: - import win32api -except ImportError: - win32api = None -import multiprocessing import lit.Test - -def abort_now(): - """Abort the current process without doing any exception teardown""" - sys.stdout.flush() - if win32api: - win32api.TerminateProcess(win32api.GetCurrentProcess(), 3) - else: - os.kill(0, 9) +import lit.util +import lit.worker class _Display(object): def __init__(self, display, provider, maxFailures): @@ -48,12 +29,11 @@ class Run(object): # For example, some ASan tests require lots of virtual memory and run # faster with less parallelism on OS X. self.parallelism_semaphores = \ - {k: multiprocessing.Semaphore(v) for k, v in + {k: multiprocessing.BoundedSemaphore(v) for k, v in self.lit_config.parallelism_groups.items()} def execute_test(self, test): - return _execute_test_impl(test, self.lit_config, - self.parallelism_semaphores) + return lit.worker._execute_test(test, self.lit_config) def execute_tests_in_pool(self, jobs, max_time): # We need to issue many wait calls, so compute the final deadline and @@ -67,22 +47,22 @@ class Run(object): # interrupts the workers before we make it into our task callback, they # will each raise a KeyboardInterrupt exception and print to stderr at # the same time. - pool = multiprocessing.Pool(jobs, worker_initializer, + pool = multiprocessing.Pool(jobs, lit.worker.initializer, (self.lit_config, self.parallelism_semaphores)) # Install a console-control signal handler on Windows. - if win32api is not None: + if lit.util.win32api is not None: def console_ctrl_handler(type): print('\nCtrl-C detected, terminating.') pool.terminate() pool.join() - abort_now() + lit.util.abort_now() return True - win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) + lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) try: - async_results = [pool.apply_async(worker_run_one_test, + async_results = [pool.apply_async(lit.worker.run_one_test, args=(test_index, test), callback=self.consume_test_result) for test_index, test in enumerate(self.tests)] @@ -143,11 +123,9 @@ class Run(object): self.failure_count = 0 self.hit_max_failures = False if jobs == 1: - global child_lit_config - child_lit_config = self.lit_config for test_index, test in enumerate(self.tests): - result = worker_run_one_test(test_index, test) - self.consume_test_result(result) + lit.worker._execute_test(test, self.lit_config) + self.consume_test_result((test_index, test)) if self.hit_max_failures: break else: @@ -159,7 +137,7 @@ class Run(object): test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) def consume_test_result(self, pool_result): - """Test completion callback for worker_run_one_test + """Test completion callback for lit.worker.run_one_test Updates the test result status in the parent process. Each task in the pool returns the test index and the result, and we use the index to look @@ -186,74 +164,3 @@ class Run(object): if self.lit_config.maxFailures and \ self.failure_count == self.lit_config.maxFailures: self.hit_max_failures = True - -def _execute_test_impl(test, lit_config, parallelism_semaphores): - """Execute one test""" - pg = test.config.parallelism_group - if callable(pg): - pg = pg(test) - - result = None - semaphore = None - try: - if pg: - semaphore = parallelism_semaphores[pg] - if semaphore: - semaphore.acquire() - start_time = time.time() - result = test.config.test_format.execute(test, lit_config) - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - result.elapsed = time.time() - start_time - except KeyboardInterrupt: - raise - except: - if lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - finally: - if semaphore: - semaphore.release() - - test.setResult(result) - -child_lit_config = None -child_parallelism_semaphores = None - -def worker_initializer(lit_config, parallelism_semaphores): - """Copy expensive repeated data into worker processes""" - global child_lit_config - child_lit_config = lit_config - global child_parallelism_semaphores - child_parallelism_semaphores = parallelism_semaphores - -def worker_run_one_test(test_index, test): - """Run one test in a multiprocessing.Pool - - Side effects in this function and functions it calls are not visible in the - main lit process. - - Arguments and results of this function are pickled, so they should be cheap - to copy. For efficiency, we copy all data needed to execute all tests into - each worker and store it in the child_* global variables. This reduces the - cost of each task. - - Returns an index and a Result, which the parent process uses to update - the display. - """ - try: - _execute_test_impl(test, child_lit_config, child_parallelism_semaphores) - return (test_index, test) - except KeyboardInterrupt as e: - # If a worker process gets an interrupt, abort it immediately. - abort_now() - except: - traceback.print_exc() diff --git a/utils/lit/lit/util.py b/utils/lit/lit/util.py index dee70b3c542..58b5563bae5 100644 --- a/utils/lit/lit/util.py +++ b/utils/lit/lit/util.py @@ -424,3 +424,17 @@ def killProcessAndChildren(pid): psutilProc.kill() except psutil.NoSuchProcess: pass + + +try: + import win32api +except ImportError: + win32api = None + +def abort_now(): + """Abort the current process without doing any exception teardown""" + sys.stdout.flush() + if win32api: + win32api.TerminateProcess(win32api.GetCurrentProcess(), 3) + else: + os.kill(0, 9) diff --git a/utils/lit/lit/worker.py b/utils/lit/lit/worker.py new file mode 100644 index 00000000000..f97b6a04306 --- /dev/null +++ b/utils/lit/lit/worker.py @@ -0,0 +1,82 @@ +# The functions in this module are meant to run on a separate worker process. +# Exception: in single process mode _execute_test is called directly. +import time +import traceback + +import lit.Test +import lit.util + +_lit_config = None +_parallelism_semaphores = None + +def initializer(lit_config, parallelism_semaphores): + """Copy expensive repeated data into worker processes""" + global _lit_config + global _parallelism_semaphores + _lit_config = lit_config + _parallelism_semaphores = parallelism_semaphores + +def run_one_test(test_index, test): + """Run one test in a multiprocessing.Pool + + Side effects in this function and functions it calls are not visible in the + main lit process. + + Arguments and results of this function are pickled, so they should be cheap + to copy. For efficiency, we copy all data needed to execute all tests into + each worker and store it in the worker_* global variables. This reduces the + cost of each task. + + Returns an index and a Result, which the parent process uses to update + the display. + """ + try: + _execute_test_in_parallelism_group(test, _lit_config, + _parallelism_semaphores) + return (test_index, test) + except KeyboardInterrupt: + # If a worker process gets an interrupt, abort it immediately. + lit.util.abort_now() + except: + traceback.print_exc() + +def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores): + """Execute one test inside the appropriate parallelism group""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + if pg: + semaphore = parallelism_semaphores[pg] + try: + semaphore.acquire() + _execute_test(test, lit_config) + finally: + semaphore.release() + else: + _execute_test(test, lit_config) + +def _execute_test(test, lit_config): + """Execute one test""" + try: + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + + test.setResult(result) -- 2.11.0