From: Simon Forman Date: Thu, 22 Sep 2022 20:20:30 +0000 (-0700) Subject: Wait for both results together. X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=92338037e7b0e2fd07f89964c0fdefd1170f5b92;p=joypy%2FThun.git Wait for both results together. If one causes an error before the other is finished SIGKILL the other job. (I do not know if this will kill the sub-tasks if any. The Python docs say that with terminate() "descendant processes of the process will not be terminated – they will simply become orphaned." but it doesn't say whether that's true with kill(). --- diff --git a/implementations/Python/joys.py b/implementations/Python/joys.py index c59ffb6..1ef827f 100644 --- a/implementations/Python/joys.py +++ b/implementations/Python/joys.py @@ -1,4 +1,5 @@ import multiprocessing as mp +from multiprocessing.connection import wait from joy import ( default_defs, initialize, @@ -48,20 +49,52 @@ def fork(stack, expr, dictionary): ) P.start() Q.start() - P.join() - Q.join() - p_err, p_result = p_pipe_recv.recv() - q_err, q_result = q_pipe_recv.recv() - if p_err: - raise ForkException(p_result) - if q_err: - raise ForkException(q_result) - stack = (q_result, (p_result, stack)) + # Ensure "wait() will promptly report the readable end as being ready." + # See docs for multiprocessing.connection.wait(). + q_pipe_send.close() + p_pipe_send.close() + + readers = [q_pipe_recv, p_pipe_recv] + ready = wait(readers) + # We have one or two results or errors + if len(ready) == 1: + read_me = ready[0] + order = read_me is q_pipe_recv + wait_me = readers[order] + wait_proc = (Q, P)[order] + stack = one_result(stack, order, read_me, wait_me, wait_proc) + else: # both results/errors + p_err, p_result = p_pipe_recv.recv() + q_err, q_result = q_pipe_recv.recv() + if p_err: + raise ForkException(p_result) + if q_err: + raise ForkException(q_result) + stack = (q_result, (p_result, stack)) return stack, expr, dictionary +def one_result(stack, order, read_me, wait_me, wait_proc): + err, result = read_me.recv() + read_me.close() + if err: + wait_me.close() + wait_proc.kill() + raise ForkException(result) + wait([wait_me]) + err, second_result = wait_me.recv() + wait_me.close() + if err: + raise ForkException(second_result) + if order: + stack = (result, (second_result, stack)) + else: + stack = (second_result, (result, stack)) + return stack + + if __name__ == '__main__': mp.set_start_method('fork')