2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
11 * Provides default implementations of {@link ExecutorService}
12 * execution methods. This class implements the <tt>submit</tt>,
13 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15 * to the {@link FutureTask} class provided in this package. For example,
16 * the implementation of <tt>submit(Runnable)</tt> creates an
17 * associated <tt>RunnableFuture</tt> that is executed and
18 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19 * to return <tt>RunnableFuture</tt> implementations other than
20 * <tt>FutureTask</tt>.
22 * <p> <b>Extension example</b>. Here is a sketch of a class
23 * that customizes {@link ThreadPoolExecutor} to use
24 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
26 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28 * static class CustomTask<V> implements RunnableFuture<V> {...}
30 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
31 * return new CustomTask<V>(c);
33 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
34 * return new CustomTask<V>(r, v);
36 * // ... add constructors, etc.
42 public abstract class AbstractExecutorService implements ExecutorService {
45 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
48 * @param runnable the runnable task being wrapped
49 * @param value the default value for the returned future
50 * @return a <tt>RunnableFuture</tt> which when run will run the
51 * underlying runnable and which, as a <tt>Future</tt>, will yield
52 * the given value as its result and provide for cancellation of
53 * the underlying task.
56 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57 return new FutureTask<T>(runnable, value);
61 * Returns a <tt>RunnableFuture</tt> for the given callable task.
63 * @param callable the callable task being wrapped
64 * @return a <tt>RunnableFuture</tt> which when run will call the
65 * underlying callable and which, as a <tt>Future</tt>, will yield
66 * the callable's result as its result and provide for
67 * cancellation of the underlying task.
70 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71 return new FutureTask<T>(callable);
74 public Future<?> submit(Runnable task) {
75 if (task == null) throw new NullPointerException();
76 RunnableFuture<Object> ftask = newTaskFor(task, null);
81 public <T> Future<T> submit(Runnable task, T result) {
82 if (task == null) throw new NullPointerException();
83 RunnableFuture<T> ftask = newTaskFor(task, result);
88 public <T> Future<T> submit(Callable<T> task) {
89 if (task == null) throw new NullPointerException();
90 RunnableFuture<T> ftask = newTaskFor(task);
96 * the main mechanics of invokeAny.
98 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
99 boolean timed, long nanos)
100 throws InterruptedException, ExecutionException, TimeoutException {
102 throw new NullPointerException();
103 int ntasks = tasks.size();
105 throw new IllegalArgumentException();
106 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
107 ExecutorCompletionService<T> ecs =
108 new ExecutorCompletionService<T>(this);
110 // For efficiency, especially in executors with limited
111 // parallelism, check to see if previously submitted tasks are
112 // done before submitting more of them. This interleaving
113 // plus the exception mechanics account for messiness of main
117 // Record exceptions so that if we fail to obtain any
118 // result, we can throw the last exception we got.
119 ExecutionException ee = null;
120 long lastTime = (timed)? System.nanoTime() : 0;
121 Iterator<? extends Callable<T>> it = tasks.iterator();
123 // Start one task for sure; the rest incrementally
124 futures.add(ecs.submit(it.next()));
129 Future<T> f = ecs.poll();
133 futures.add(ecs.submit(it.next()));
136 else if (active == 0)
139 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
141 throw new TimeoutException();
142 long now = System.nanoTime();
143 nanos -= now - lastTime;
153 } catch (InterruptedException ie) {
155 } catch (ExecutionException eex) {
157 } catch (RuntimeException rex) {
158 ee = new ExecutionException(rex);
164 ee = new ExecutionException();
168 for (Future<T> f : futures)
173 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
174 throws InterruptedException, ExecutionException {
176 return doInvokeAny(tasks, false, 0);
177 } catch (TimeoutException cannotHappen) {
183 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
184 long timeout, TimeUnit unit)
185 throws InterruptedException, ExecutionException, TimeoutException {
186 return doInvokeAny(tasks, true, unit.toNanos(timeout));
189 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
190 throws InterruptedException {
192 throw new NullPointerException();
193 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
194 boolean done = false;
196 for (Callable<T> t : tasks) {
197 RunnableFuture<T> f = newTaskFor(t);
201 for (Future<T> f : futures) {
205 } catch (CancellationException ignore) {
206 } catch (ExecutionException ignore) {
214 for (Future<T> f : futures)
219 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
220 long timeout, TimeUnit unit)
221 throws InterruptedException {
222 if (tasks == null || unit == null)
223 throw new NullPointerException();
224 long nanos = unit.toNanos(timeout);
225 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
226 boolean done = false;
228 for (Callable<T> t : tasks)
229 futures.add(newTaskFor(t));
231 long lastTime = System.nanoTime();
233 // Interleave time checks and calls to execute in case
234 // executor doesn't have any/much parallelism.
235 Iterator<Future<T>> it = futures.iterator();
236 while (it.hasNext()) {
237 execute((Runnable)(it.next()));
238 long now = System.nanoTime();
239 nanos -= now - lastTime;
245 for (Future<T> f : futures) {
250 f.get(nanos, TimeUnit.NANOSECONDS);
251 } catch (CancellationException ignore) {
252 } catch (ExecutionException ignore) {
253 } catch (TimeoutException toe) {
256 long now = System.nanoTime();
257 nanos -= now - lastTime;
265 for (Future<T> f : futures)