OSDN Git Service

Wait for both results together.
authorSimon Forman <sforman@hushmail.com>
Thu, 22 Sep 2022 20:20:30 +0000 (13:20 -0700)
committerSimon Forman <sforman@hushmail.com>
Thu, 22 Sep 2022 20:20:30 +0000 (13:20 -0700)
If one causes an error before the other is finished SIGKILL the other
job.  (I do not know if this will kill the sub-tasks if any.  The Python
docs say that with terminate() "descendant processes of the process will
not be terminated – they will simply become orphaned." but it doesn't
say whether that's true with kill().

implementations/Python/joys.py

index c59ffb6..1ef827f 100644 (file)
@@ -1,4 +1,5 @@
 import multiprocessing as mp
+from multiprocessing.connection import wait
 from joy import (
     default_defs,
     initialize,
@@ -48,20 +49,52 @@ def fork(stack, expr, dictionary):
         )
     P.start()
     Q.start()
-    P.join()
-    Q.join()
-    p_err, p_result = p_pipe_recv.recv()
-    q_err, q_result = q_pipe_recv.recv()
-    if p_err:
-        raise ForkException(p_result)
-    if q_err:
-        raise ForkException(q_result)
 
-    stack = (q_result, (p_result, stack))
+    # Ensure "wait() will promptly report the readable end as being ready."
+    # See docs for multiprocessing.connection.wait().
+    q_pipe_send.close()
+    p_pipe_send.close()
+
+    readers = [q_pipe_recv, p_pipe_recv]
+    ready = wait(readers)
+    # We have one or two results or errors
+    if len(ready) == 1:
+        read_me = ready[0]
+        order = read_me is q_pipe_recv
+        wait_me = readers[order]
+        wait_proc = (Q, P)[order]
+        stack = one_result(stack, order, read_me, wait_me, wait_proc)
+    else:  # both results/errors
+        p_err, p_result = p_pipe_recv.recv()
+        q_err, q_result = q_pipe_recv.recv()
+        if p_err:
+            raise ForkException(p_result)
+        if q_err:
+            raise ForkException(q_result)
+        stack = (q_result, (p_result, stack))
 
     return stack, expr, dictionary
 
 
+def one_result(stack, order, read_me, wait_me, wait_proc):
+    err, result = read_me.recv()
+    read_me.close()
+    if err:
+        wait_me.close()
+        wait_proc.kill()
+        raise ForkException(result)
+    wait([wait_me])
+    err, second_result = wait_me.recv()
+    wait_me.close()
+    if err:
+        raise ForkException(second_result)
+    if order:
+        stack = (result, (second_result, stack))
+    else:
+        stack = (second_result, (result, stack))
+    return stack
+
+
 if __name__ == '__main__':
     mp.set_start_method('fork')