OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / ExecutorCompletionService.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8
9 /**
10  * A {@link CompletionService} that uses a supplied {@link Executor}
11  * to execute tasks.  This class arranges that submitted tasks are,
12  * upon completion, placed on a queue accessible using <tt>take</tt>.
13  * The class is lightweight enough to be suitable for transient use
14  * when processing groups of tasks.
15  *
16  * <p>
17  *
18  * <b>Usage Examples.</b>
19  *
20  * Suppose you have a set of solvers for a certain problem, each
21  * returning a value of some type <tt>Result</tt>, and would like to
22  * run them concurrently, processing the results of each of them that
23  * return a non-null value, in some method <tt>use(Result r)</tt>. You
24  * could write this as:
25  *
26  * <pre>
27  *   void solve(Executor e,
28  *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
29  *     throws InterruptedException, ExecutionException {
30  *       CompletionService&lt;Result&gt; ecs
31  *           = new ExecutorCompletionService&lt;Result&gt;(e);
32  *       for (Callable&lt;Result&gt; s : solvers)
33  *           ecs.submit(s);
34  *       int n = solvers.size();
35  *       for (int i = 0; i &lt; n; ++i) {
36  *           Result r = ecs.take().get();
37  *           if (r != null)
38  *               use(r);
39  *       }
40  *   }
41  * </pre>
42  *
43  * Suppose instead that you would like to use the first non-null result
44  * of the set of tasks, ignoring any that encounter exceptions,
45  * and cancelling all other tasks when the first one is ready:
46  *
47  * <pre>
48  *   void solve(Executor e,
49  *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
50  *     throws InterruptedException {
51  *       CompletionService&lt;Result&gt; ecs
52  *           = new ExecutorCompletionService&lt;Result&gt;(e);
53  *       int n = solvers.size();
54  *       List&lt;Future&lt;Result&gt;&gt; futures
55  *           = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
56  *       Result result = null;
57  *       try {
58  *           for (Callable&lt;Result&gt; s : solvers)
59  *               futures.add(ecs.submit(s));
60  *           for (int i = 0; i &lt; n; ++i) {
61  *               try {
62  *                   Result r = ecs.take().get();
63  *                   if (r != null) {
64  *                       result = r;
65  *                       break;
66  *                   }
67  *               } catch (ExecutionException ignore) {}
68  *           }
69  *       }
70  *       finally {
71  *           for (Future&lt;Result&gt; f : futures)
72  *               f.cancel(true);
73  *       }
74  *
75  *       if (result != null)
76  *           use(result);
77  *   }
78  * </pre>
79  */
80 public class ExecutorCompletionService<V> implements CompletionService<V> {
81     private final Executor executor;
82     private final AbstractExecutorService aes;
83     private final BlockingQueue<Future<V>> completionQueue;
84
85     /**
86      * FutureTask extension to enqueue upon completion
87      */
88     private class QueueingFuture extends FutureTask<Void> {
89         QueueingFuture(RunnableFuture<V> task) {
90             super(task, null);
91             this.task = task;
92         }
93         protected void done() { completionQueue.add(task); }
94         private final Future<V> task;
95     }
96
97     private RunnableFuture<V> newTaskFor(Callable<V> task) {
98         if (aes == null)
99             return new FutureTask<V>(task);
100         else
101             return aes.newTaskFor(task);
102     }
103
104     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
105         if (aes == null)
106             return new FutureTask<V>(task, result);
107         else
108             return aes.newTaskFor(task, result);
109     }
110
111     /**
112      * Creates an ExecutorCompletionService using the supplied
113      * executor for base task execution and a
114      * {@link LinkedBlockingQueue} as a completion queue.
115      *
116      * @param executor the executor to use
117      * @throws NullPointerException if executor is <tt>null</tt>
118      */
119     public ExecutorCompletionService(Executor executor) {
120         if (executor == null)
121             throw new NullPointerException();
122         this.executor = executor;
123         this.aes = (executor instanceof AbstractExecutorService) ?
124             (AbstractExecutorService) executor : null;
125         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
126     }
127
128     /**
129      * Creates an ExecutorCompletionService using the supplied
130      * executor for base task execution and the supplied queue as its
131      * completion queue.
132      *
133      * @param executor the executor to use
134      * @param completionQueue the queue to use as the completion queue
135      * normally one dedicated for use by this service
136      * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
137      */
138     public ExecutorCompletionService(Executor executor,
139                                      BlockingQueue<Future<V>> completionQueue) {
140         if (executor == null || completionQueue == null)
141             throw new NullPointerException();
142         this.executor = executor;
143         this.aes = (executor instanceof AbstractExecutorService) ?
144             (AbstractExecutorService) executor : null;
145         this.completionQueue = completionQueue;
146     }
147
148     public Future<V> submit(Callable<V> task) {
149         if (task == null) throw new NullPointerException();
150         RunnableFuture<V> f = newTaskFor(task);
151         executor.execute(new QueueingFuture(f));
152         return f;
153     }
154
155     public Future<V> submit(Runnable task, V result) {
156         if (task == null) throw new NullPointerException();
157         RunnableFuture<V> f = newTaskFor(task, result);
158         executor.execute(new QueueingFuture(f));
159         return f;
160     }
161
162     public Future<V> take() throws InterruptedException {
163         return completionQueue.take();
164     }
165
166     public Future<V> poll() {
167         return completionQueue.poll();
168     }
169
170     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
171         return completionQueue.poll(timeout, unit);
172     }
173
174 }