OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / AbstractExecutorService.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11  * Provides default implementations of {@link ExecutorService}
12  * execution methods. This class implements the <tt>submit</tt>,
13  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15  * to the {@link FutureTask} class provided in this package.  For example,
16  * the implementation of <tt>submit(Runnable)</tt> creates an
17  * associated <tt>RunnableFuture</tt> that is executed and
18  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19  * to return <tt>RunnableFuture</tt> implementations other than
20  * <tt>FutureTask</tt>.
21  *
22  * <p> <b>Extension example</b>. Here is a sketch of a class
23  * that customizes {@link ThreadPoolExecutor} to use
24  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
25  * <pre>
26  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27  *
28  *   static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
29  *
30  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
31  *       return new CustomTask&lt;V&gt;(c);
32  *   }
33  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
34  *       return new CustomTask&lt;V&gt;(r, v);
35  *   }
36  *   // ... add constructors, etc.
37  * }
38  * </pre>
39  * @since 1.5
40  * @author Doug Lea
41  */
42 public abstract class AbstractExecutorService implements ExecutorService {
43
44     /**
45      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
46      * value.
47      *
48      * @param runnable the runnable task being wrapped
49      * @param value the default value for the returned future
50      * @return a <tt>RunnableFuture</tt> which when run will run the
51      * underlying runnable and which, as a <tt>Future</tt>, will yield
52      * the given value as its result and provide for cancellation of
53      * the underlying task.
54      * @since 1.6
55      */
56     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57         return new FutureTask<T>(runnable, value);
58     }
59
60     /**
61      * Returns a <tt>RunnableFuture</tt> for the given callable task.
62      *
63      * @param callable the callable task being wrapped
64      * @return a <tt>RunnableFuture</tt> which when run will call the
65      * underlying callable and which, as a <tt>Future</tt>, will yield
66      * the callable's result as its result and provide for
67      * cancellation of the underlying task.
68      * @since 1.6
69      */
70     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71         return new FutureTask<T>(callable);
72     }
73
74     public Future<?> submit(Runnable task) {
75         if (task == null) throw new NullPointerException();
76         RunnableFuture<Object> ftask = newTaskFor(task, null);
77         execute(ftask);
78         return ftask;
79     }
80
81     public <T> Future<T> submit(Runnable task, T result) {
82         if (task == null) throw new NullPointerException();
83         RunnableFuture<T> ftask = newTaskFor(task, result);
84         execute(ftask);
85         return ftask;
86     }
87
88     public <T> Future<T> submit(Callable<T> task) {
89         if (task == null) throw new NullPointerException();
90         RunnableFuture<T> ftask = newTaskFor(task);
91         execute(ftask);
92         return ftask;
93     }
94
95     /**
96      * the main mechanics of invokeAny.
97      */
98     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
99                             boolean timed, long nanos)
100         throws InterruptedException, ExecutionException, TimeoutException {
101         if (tasks == null)
102             throw new NullPointerException();
103         int ntasks = tasks.size();
104         if (ntasks == 0)
105             throw new IllegalArgumentException();
106         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
107         ExecutorCompletionService<T> ecs =
108             new ExecutorCompletionService<T>(this);
109
110         // For efficiency, especially in executors with limited
111         // parallelism, check to see if previously submitted tasks are
112         // done before submitting more of them. This interleaving
113         // plus the exception mechanics account for messiness of main
114         // loop.
115
116         try {
117             // Record exceptions so that if we fail to obtain any
118             // result, we can throw the last exception we got.
119             ExecutionException ee = null;
120             long lastTime = (timed)? System.nanoTime() : 0;
121             Iterator<? extends Callable<T>> it = tasks.iterator();
122
123             // Start one task for sure; the rest incrementally
124             futures.add(ecs.submit(it.next()));
125             --ntasks;
126             int active = 1;
127
128             for (;;) {
129                 Future<T> f = ecs.poll();
130                 if (f == null) {
131                     if (ntasks > 0) {
132                         --ntasks;
133                         futures.add(ecs.submit(it.next()));
134                         ++active;
135                     }
136                     else if (active == 0)
137                         break;
138                     else if (timed) {
139                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
140                         if (f == null)
141                             throw new TimeoutException();
142                         long now = System.nanoTime();
143                         nanos -= now - lastTime;
144                         lastTime = now;
145                     }
146                     else
147                         f = ecs.take();
148                 }
149                 if (f != null) {
150                     --active;
151                     try {
152                         return f.get();
153                     } catch (InterruptedException ie) {
154                         throw ie;
155                     } catch (ExecutionException eex) {
156                         ee = eex;
157                     } catch (RuntimeException rex) {
158                         ee = new ExecutionException(rex);
159                     }
160                 }
161             }
162
163             if (ee == null)
164                 ee = new ExecutionException();
165             throw ee;
166
167         } finally {
168             for (Future<T> f : futures)
169                 f.cancel(true);
170         }
171     }
172
173     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
174         throws InterruptedException, ExecutionException {
175         try {
176             return doInvokeAny(tasks, false, 0);
177         } catch (TimeoutException cannotHappen) {
178             assert false;
179             return null;
180         }
181     }
182
183     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
184                            long timeout, TimeUnit unit)
185         throws InterruptedException, ExecutionException, TimeoutException {
186         return doInvokeAny(tasks, true, unit.toNanos(timeout));
187     }
188
189     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
190         throws InterruptedException {
191         if (tasks == null)
192             throw new NullPointerException();
193         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
194         boolean done = false;
195         try {
196             for (Callable<T> t : tasks) {
197                 RunnableFuture<T> f = newTaskFor(t);
198                 futures.add(f);
199                 execute(f);
200             }
201             for (Future<T> f : futures) {
202                 if (!f.isDone()) {
203                     try {
204                         f.get();
205                     } catch (CancellationException ignore) {
206                     } catch (ExecutionException ignore) {
207                     }
208                 }
209             }
210             done = true;
211             return futures;
212         } finally {
213             if (!done)
214                 for (Future<T> f : futures)
215                     f.cancel(true);
216         }
217     }
218
219     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
220                                          long timeout, TimeUnit unit)
221         throws InterruptedException {
222         if (tasks == null || unit == null)
223             throw new NullPointerException();
224         long nanos = unit.toNanos(timeout);
225         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
226         boolean done = false;
227         try {
228             for (Callable<T> t : tasks)
229                 futures.add(newTaskFor(t));
230
231             long lastTime = System.nanoTime();
232
233             // Interleave time checks and calls to execute in case
234             // executor doesn't have any/much parallelism.
235             Iterator<Future<T>> it = futures.iterator();
236             while (it.hasNext()) {
237                 execute((Runnable)(it.next()));
238                 long now = System.nanoTime();
239                 nanos -= now - lastTime;
240                 lastTime = now;
241                 if (nanos <= 0)
242                     return futures;
243             }
244
245             for (Future<T> f : futures) {
246                 if (!f.isDone()) {
247                     if (nanos <= 0)
248                         return futures;
249                     try {
250                         f.get(nanos, TimeUnit.NANOSECONDS);
251                     } catch (CancellationException ignore) {
252                     } catch (ExecutionException ignore) {
253                     } catch (TimeoutException toe) {
254                         return futures;
255                     }
256                     long now = System.nanoTime();
257                     nanos -= now - lastTime;
258                     lastTime = now;
259                 }
260             }
261             done = true;
262             return futures;
263         } finally {
264             if (!done)
265                 for (Future<T> f : futures)
266                     f.cancel(true);
267         }
268     }
269
270 }