OSDN Git Service

Minor updates to README file.
[joypy/Thun.git] / implementations / Python / joys.py
1 import multiprocessing as mp
2 from multiprocessing.connection import wait
3 from joy import (
4     default_defs,
5     initialize,
6     joy,
7     repl,
8     get_n_items,
9     isnt_stack,
10     inscribe,
11     )
12
13
14 class ForkException(Exception): pass
15
16
17 def fork_joy(send, stack, expr, dictionary):
18     try:
19         stack, dictionary = joy(stack, expr, dictionary)
20         result, _ = get_n_items(1, stack)
21     except Exception as err:
22         send.send((True, repr(err)))
23     else:
24         send.send((False, result))
25
26
27 @inscribe
28 def fork(stack, expr, dictionary):
29     '''
30     Take two quoted programs from the stack and
31     run them in parallel.
32
33         fork ≡ [i] app2
34
35     '''
36     q, p, stack = get_n_items(2, stack)
37     isnt_stack(q)
38     isnt_stack(p)
39     q_pipe_recv, q_pipe_send = mp.Pipe(False)
40     p_pipe_recv, p_pipe_send = mp.Pipe(False)
41
42     P = mp.Process(
43         target=fork_joy,
44         args=(p_pipe_send, stack, p, dictionary),
45         )
46     Q = mp.Process(
47         target=fork_joy,
48         args=(q_pipe_send, stack, q, dictionary),
49         )
50     P.start()
51     Q.start()
52
53     # Ensure "wait() will promptly report the readable end as being ready."
54     # See docs for multiprocessing.connection.wait().
55     q_pipe_send.close()
56     p_pipe_send.close()
57
58     readers = [q_pipe_recv, p_pipe_recv]
59     ready = wait(readers)
60     # We have one or two results or errors
61     if len(ready) == 1:
62         read_me = ready[0]
63         order = read_me is q_pipe_recv
64         wait_me = readers[order]
65         wait_proc = (Q, P)[order]
66         stack = one_result(stack, order, read_me, wait_me, wait_proc)
67     else:  # both results/errors
68         p_err, p_result = p_pipe_recv.recv()
69         q_err, q_result = q_pipe_recv.recv()
70         if p_err:
71             raise ForkException(p_result)
72         if q_err:
73             raise ForkException(q_result)
74         stack = (q_result, (p_result, stack))
75
76     return stack, expr, dictionary
77
78
79 def one_result(stack, order, read_me, wait_me, wait_proc):
80     err, result = read_me.recv()
81     read_me.close()
82     if err:
83         wait_me.close()
84         wait_proc.kill()
85         raise ForkException(result)
86     wait([wait_me])
87     err, second_result = wait_me.recv()
88     wait_me.close()
89     if err:
90         raise ForkException(second_result)
91     if order:
92         stack = (result, (second_result, stack))
93     else:
94         stack = (second_result, (result, stack))
95     return stack
96
97
98 if __name__ == '__main__':
99     mp.set_start_method('fork')
100
101     dictionary = initialize()
102     default_defs(dictionary)
103     try:
104         stack = repl(dictionary=dictionary)
105     except SystemExit:
106         pass