OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / ScheduledThreadPoolExecutor.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12  * A {@link ThreadPoolExecutor} that can additionally schedule
13  * commands to run after a given delay, or to execute
14  * periodically. This class is preferable to {@link java.util.Timer}
15  * when multiple worker threads are needed, or when the additional
16  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
17  * this class extends) are required.
18  *
19  * <p> Delayed tasks execute no sooner than they are enabled, but
20  * without any real-time guarantees about when, after they are
21  * enabled, they will commence. Tasks scheduled for exactly the same
22  * execution time are enabled in first-in-first-out (FIFO) order of
23  * submission.
24  *
25  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
26  * of the inherited tuning methods are not useful for it. In
27  * particular, because it acts as a fixed-sized pool using
28  * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
29  * to <tt>maximumPoolSize</tt> have no useful effect.
30  *
31  * <p><b>Extension notes:</b> This class overrides {@link
32  * AbstractExecutorService} <tt>submit</tt> methods to generate
33  * internal objects to control per-task delays and scheduling. To
34  * preserve functionality, any further overrides of these methods in
35  * subclasses must invoke superclass versions, which effectively
36  * disables additional task customization. However, this class
37  * provides alternative protected extension method
38  * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
39  * <tt>Callable</tt>) that can be used to customize the concrete task
40  * types used to execute commands entered via <tt>execute</tt>,
41  * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
42  * and <tt>scheduleWithFixedDelay</tt>.  By default, a
43  * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
44  * {@link FutureTask}. However, this may be modified or replaced using
45  * subclasses of the form:
46  *
47  * <pre>
48  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
49  *
50  *   static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
51  *
52  *   protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
53  *                Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
54  *       return new CustomTask&lt;V&gt;(r, task);
55  *   }
56  *
57  *   protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
58  *                Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
59  *       return new CustomTask&lt;V&gt;(c, task);
60  *   }
61  *   // ... add constructors, etc.
62  * }
63  * </pre>
64  * @since 1.5
65  * @author Doug Lea
66  */
67 public class ScheduledThreadPoolExecutor
68         extends ThreadPoolExecutor
69         implements ScheduledExecutorService {
70
71     /**
72      * False if should cancel/suppress periodic tasks on shutdown.
73      */
74     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
75
76     /**
77      * False if should cancel non-periodic tasks on shutdown.
78      */
79     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
80
81     /**
82      * Sequence number to break scheduling ties, and in turn to
83      * guarantee FIFO order among tied entries.
84      */
85     private static final AtomicLong sequencer = new AtomicLong(0);
86
87     /** Base of nanosecond timings, to avoid wrapping */
88     private static final long NANO_ORIGIN = System.nanoTime();
89
90     /**
91      * Returns nanosecond time offset by origin
92      */
93     final long now() {
94         return System.nanoTime() - NANO_ORIGIN;
95     }
96
97     private class ScheduledFutureTask<V>
98             extends FutureTask<V> implements RunnableScheduledFuture<V> {
99
100         /** Sequence number to break ties FIFO */
101         private final long sequenceNumber;
102         /** The time the task is enabled to execute in nanoTime units */
103         private long time;
104         /**
105          * Period in nanoseconds for repeating tasks.  A positive
106          * value indicates fixed-rate execution.  A negative value
107          * indicates fixed-delay execution.  A value of 0 indicates a
108          * non-repeating task.
109          */
110         private final long period;
111
112         /**
113          * Creates a one-shot action with given nanoTime-based trigger time.
114          */
115         ScheduledFutureTask(Runnable r, V result, long ns) {
116             super(r, result);
117             this.time = ns;
118             this.period = 0;
119             this.sequenceNumber = sequencer.getAndIncrement();
120         }
121
122         /**
123          * Creates a periodic action with given nano time and period.
124          */
125         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
126             super(r, result);
127             this.time = ns;
128             this.period = period;
129             this.sequenceNumber = sequencer.getAndIncrement();
130         }
131
132         /**
133          * Creates a one-shot action with given nanoTime-based trigger.
134          */
135         ScheduledFutureTask(Callable<V> callable, long ns) {
136             super(callable);
137             this.time = ns;
138             this.period = 0;
139             this.sequenceNumber = sequencer.getAndIncrement();
140         }
141
142         public long getDelay(TimeUnit unit) {
143             long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
144             return d;
145         }
146
147         public int compareTo(Delayed other) {
148             if (other == this) // compare zero ONLY if same object
149                 return 0;
150             if (other instanceof ScheduledFutureTask) {
151                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
152                 long diff = time - x.time;
153                 if (diff < 0)
154                     return -1;
155                 else if (diff > 0)
156                     return 1;
157                 else if (sequenceNumber < x.sequenceNumber)
158                     return -1;
159                 else
160                     return 1;
161             }
162             long d = (getDelay(TimeUnit.NANOSECONDS) -
163                       other.getDelay(TimeUnit.NANOSECONDS));
164             return (d == 0)? 0 : ((d < 0)? -1 : 1);
165         }
166
167         /**
168          * Returns true if this is a periodic (not a one-shot) action.
169          *
170          * @return true if periodic
171          */
172         public boolean isPeriodic() {
173             return period != 0;
174         }
175
176         /**
177          * Runs a periodic task.
178          */
179         private void runPeriodic() {
180             boolean ok = ScheduledFutureTask.super.runAndReset();
181             boolean down = isShutdown();
182             // Reschedule if not cancelled and not shutdown or policy allows
183             if (ok && (!down ||
184                        (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
185                         !isTerminating()))) {
186                 long p = period;
187                 if (p > 0)
188                     time += p;
189                 else
190                     time = now() - p;
191                 // Classpath local: ecj from eclipse 3.1 does not
192                 // compile this.
193                 // ScheduledThreadPoolExecutor.super.getQueue().add(this);
194                 ScheduledThreadPoolExecutor.super.getQueue().add((Runnable) this);
195             }
196             // This might have been the final executed delayed
197             // task.  Wake up threads to check.
198             else if (down)
199                 interruptIdleWorkers();
200         }
201
202         /**
203          * Overrides FutureTask version so as to reset/requeue if periodic.
204          */
205         public void run() {
206             if (isPeriodic())
207                 runPeriodic();
208             else
209                 ScheduledFutureTask.super.run();
210         }
211     }
212
213     /**
214      * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
215      */
216     private void delayedExecute(Runnable command) {
217         if (isShutdown()) {
218             reject(command);
219             return;
220         }
221         // Prestart a thread if necessary. We cannot prestart it
222         // running the task because the task (probably) shouldn't be
223         // run yet, so thread will just idle until delay elapses.
224         if (getPoolSize() < getCorePoolSize())
225             prestartCoreThread();
226
227         super.getQueue().add(command);
228     }
229
230     /**
231      * Cancels and clears the queue of all tasks that should not be run
232      * due to shutdown policy.
233      */
234     private void cancelUnwantedTasks() {
235         boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
236         boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
237         if (!keepDelayed && !keepPeriodic)
238             super.getQueue().clear();
239         else if (keepDelayed || keepPeriodic) {
240             Object[] entries = super.getQueue().toArray();
241             for (int i = 0; i < entries.length; ++i) {
242                 Object e = entries[i];
243                 if (e instanceof RunnableScheduledFuture) {
244                     RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
245                     if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
246                         t.cancel(false);
247                 }
248             }
249             entries = null;
250             purge();
251         }
252     }
253
254     public boolean remove(Runnable task) {
255         if (!(task instanceof RunnableScheduledFuture))
256             return false;
257         return getQueue().remove(task);
258     }
259
260     /**
261      * Modifies or replaces the task used to execute a runnable.
262      * This method can be used to override the concrete
263      * class used for managing internal tasks.
264      * The default implementation simply returns the given task.
265      *
266      * @param runnable the submitted Runnable
267      * @param task the task created to execute the runnable
268      * @return a task that can execute the runnable
269      * @since 1.6
270      */
271     protected <V> RunnableScheduledFuture<V> decorateTask(
272         Runnable runnable, RunnableScheduledFuture<V> task) {
273         return task;
274     }
275
276     /**
277      * Modifies or replaces the task used to execute a callable.
278      * This method can be used to override the concrete
279      * class used for managing internal tasks.
280      * The default implementation simply returns the given task.
281      *
282      * @param callable the submitted Callable
283      * @param task the task created to execute the callable
284      * @return a task that can execute the callable
285      * @since 1.6
286      */
287     protected <V> RunnableScheduledFuture<V> decorateTask(
288         Callable<V> callable, RunnableScheduledFuture<V> task) {
289         return task;
290     }
291
292     /**
293      * Creates a new ScheduledThreadPoolExecutor with the given core
294      * pool size.
295      *
296      * @param corePoolSize the number of threads to keep in the pool,
297      * even if they are idle
298      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
299      */
300     public ScheduledThreadPoolExecutor(int corePoolSize) {
301         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
302               new DelayedWorkQueue());
303     }
304
305     /**
306      * Creates a new ScheduledThreadPoolExecutor with the given
307      * initial parameters.
308      *
309      * @param corePoolSize the number of threads to keep in the pool,
310      * even if they are idle
311      * @param threadFactory the factory to use when the executor
312      * creates a new thread
313      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
314      * @throws NullPointerException if threadFactory is null
315      */
316     public ScheduledThreadPoolExecutor(int corePoolSize,
317                              ThreadFactory threadFactory) {
318         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
319               new DelayedWorkQueue(), threadFactory);
320     }
321
322     /**
323      * Creates a new ScheduledThreadPoolExecutor with the given
324      * initial parameters.
325      *
326      * @param corePoolSize the number of threads to keep in the pool,
327      * even if they are idle
328      * @param handler the handler to use when execution is blocked
329      * because the thread bounds and queue capacities are reached
330      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
331      * @throws NullPointerException if handler is null
332      */
333     public ScheduledThreadPoolExecutor(int corePoolSize,
334                               RejectedExecutionHandler handler) {
335         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
336               new DelayedWorkQueue(), handler);
337     }
338
339     /**
340      * Creates a new ScheduledThreadPoolExecutor with the given
341      * initial parameters.
342      *
343      * @param corePoolSize the number of threads to keep in the pool,
344      * even if they are idle
345      * @param threadFactory the factory to use when the executor
346      * creates a new thread
347      * @param handler the handler to use when execution is blocked
348      * because the thread bounds and queue capacities are reached.
349      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
350      * @throws NullPointerException if threadFactory or handler is null
351      */
352     public ScheduledThreadPoolExecutor(int corePoolSize,
353                               ThreadFactory threadFactory,
354                               RejectedExecutionHandler handler) {
355         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
356               new DelayedWorkQueue(), threadFactory, handler);
357     }
358
359     public ScheduledFuture<?> schedule(Runnable command,
360                                        long delay,
361                                        TimeUnit unit) {
362         if (command == null || unit == null)
363             throw new NullPointerException();
364         long triggerTime = now() + unit.toNanos(delay);
365         RunnableScheduledFuture<?> t = decorateTask(command,
366             new ScheduledFutureTask<Boolean>(command, null, triggerTime));
367         delayedExecute(t);
368         return t;
369     }
370
371     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
372                                            long delay,
373                                            TimeUnit unit) {
374         if (callable == null || unit == null)
375             throw new NullPointerException();
376         if (delay < 0) delay = 0;
377         long triggerTime = now() + unit.toNanos(delay);
378         RunnableScheduledFuture<V> t = decorateTask(callable,
379             new ScheduledFutureTask<V>(callable, triggerTime));
380         delayedExecute(t);
381         return t;
382     }
383
384     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
385                                                   long initialDelay,
386                                                   long period,
387                                                   TimeUnit unit) {
388         if (command == null || unit == null)
389             throw new NullPointerException();
390         if (period <= 0)
391             throw new IllegalArgumentException();
392         if (initialDelay < 0) initialDelay = 0;
393         long triggerTime = now() + unit.toNanos(initialDelay);
394         RunnableScheduledFuture<?> t = decorateTask(command,
395             new ScheduledFutureTask<Object>(command,
396                                             null,
397                                             triggerTime,
398                                             unit.toNanos(period)));
399         delayedExecute(t);
400         return t;
401     }
402
403     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
404                                                      long initialDelay,
405                                                      long delay,
406                                                      TimeUnit unit) {
407         if (command == null || unit == null)
408             throw new NullPointerException();
409         if (delay <= 0)
410             throw new IllegalArgumentException();
411         if (initialDelay < 0) initialDelay = 0;
412         long triggerTime = now() + unit.toNanos(initialDelay);
413         RunnableScheduledFuture<?> t = decorateTask(command,
414             new ScheduledFutureTask<Boolean>(command,
415                                              null,
416                                              triggerTime,
417                                              unit.toNanos(-delay)));
418         delayedExecute(t);
419         return t;
420     }
421
422
423     /**
424      * Executes command with zero required delay. This has effect
425      * equivalent to <tt>schedule(command, 0, anyUnit)</tt>.  Note
426      * that inspections of the queue and of the list returned by
427      * <tt>shutdownNow</tt> will access the zero-delayed
428      * {@link ScheduledFuture}, not the <tt>command</tt> itself.
429      *
430      * @param command the task to execute
431      * @throws RejectedExecutionException at discretion of
432      * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
433      * for execution because the executor has been shut down.
434      * @throws NullPointerException if command is null
435      */
436     public void execute(Runnable command) {
437         if (command == null)
438             throw new NullPointerException();
439         schedule(command, 0, TimeUnit.NANOSECONDS);
440     }
441
442     // Override AbstractExecutorService methods
443
444     public Future<?> submit(Runnable task) {
445         return schedule(task, 0, TimeUnit.NANOSECONDS);
446     }
447
448     public <T> Future<T> submit(Runnable task, T result) {
449         return schedule(Executors.callable(task, result),
450                         0, TimeUnit.NANOSECONDS);
451     }
452
453     public <T> Future<T> submit(Callable<T> task) {
454         return schedule(task, 0, TimeUnit.NANOSECONDS);
455     }
456
457     /**
458      * Sets the policy on whether to continue executing existing periodic
459      * tasks even when this executor has been <tt>shutdown</tt>. In
460      * this case, these tasks will only terminate upon
461      * <tt>shutdownNow</tt>, or after setting the policy to
462      * <tt>false</tt> when already shutdown. This value is by default
463      * false.
464      *
465      * @param value if true, continue after shutdown, else don't.
466      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
467      */
468     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
469         continueExistingPeriodicTasksAfterShutdown = value;
470         if (!value && isShutdown())
471             cancelUnwantedTasks();
472     }
473
474     /**
475      * Gets the policy on whether to continue executing existing
476      * periodic tasks even when this executor has been
477      * <tt>shutdown</tt>. In this case, these tasks will only
478      * terminate upon <tt>shutdownNow</tt> or after setting the policy
479      * to <tt>false</tt> when already shutdown. This value is by
480      * default false.
481      *
482      * @return true if will continue after shutdown
483      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
484      */
485     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
486         return continueExistingPeriodicTasksAfterShutdown;
487     }
488
489     /**
490      * Sets the policy on whether to execute existing delayed
491      * tasks even when this executor has been <tt>shutdown</tt>. In
492      * this case, these tasks will only terminate upon
493      * <tt>shutdownNow</tt>, or after setting the policy to
494      * <tt>false</tt> when already shutdown. This value is by default
495      * true.
496      *
497      * @param value if true, execute after shutdown, else don't.
498      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
499      */
500     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
501         executeExistingDelayedTasksAfterShutdown = value;
502         if (!value && isShutdown())
503             cancelUnwantedTasks();
504     }
505
506     /**
507      * Gets the policy on whether to execute existing delayed
508      * tasks even when this executor has been <tt>shutdown</tt>. In
509      * this case, these tasks will only terminate upon
510      * <tt>shutdownNow</tt>, or after setting the policy to
511      * <tt>false</tt> when already shutdown. This value is by default
512      * true.
513      *
514      * @return true if will execute after shutdown
515      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
516      */
517     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
518         return executeExistingDelayedTasksAfterShutdown;
519     }
520
521
522     /**
523      * Initiates an orderly shutdown in which previously submitted
524      * tasks are executed, but no new tasks will be accepted. If the
525      * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
526      * been set <tt>false</tt>, existing delayed tasks whose delays
527      * have not yet elapsed are cancelled. And unless the
528      * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
529      * been set <tt>true</tt>, future executions of existing periodic
530      * tasks will be cancelled.
531      */
532     public void shutdown() {
533         cancelUnwantedTasks();
534         super.shutdown();
535     }
536
537     /**
538      * Attempts to stop all actively executing tasks, halts the
539      * processing of waiting tasks, and returns a list of the tasks
540      * that were awaiting execution.
541      *
542      * <p>There are no guarantees beyond best-effort attempts to stop
543      * processing actively executing tasks.  This implementation
544      * cancels tasks via {@link Thread#interrupt}, so any task that
545      * fails to respond to interrupts may never terminate.
546      *
547      * @return list of tasks that never commenced execution.  Each
548      * element of this list is a {@link ScheduledFuture},
549      * including those tasks submitted using <tt>execute</tt>, which
550      * are for scheduling purposes used as the basis of a zero-delay
551      * <tt>ScheduledFuture</tt>.
552      * @throws SecurityException {@inheritDoc}
553      */
554     public List<Runnable> shutdownNow() {
555         return super.shutdownNow();
556     }
557
558     /**
559      * Returns the task queue used by this executor.  Each element of
560      * this queue is a {@link ScheduledFuture}, including those
561      * tasks submitted using <tt>execute</tt> which are for scheduling
562      * purposes used as the basis of a zero-delay
563      * <tt>ScheduledFuture</tt>. Iteration over this queue is
564      * <em>not</em> guaranteed to traverse tasks in the order in
565      * which they will execute.
566      *
567      * @return the task queue
568      */
569     public BlockingQueue<Runnable> getQueue() {
570         return super.getQueue();
571     }
572
573     /**
574      * An annoying wrapper class to convince javac to use a
575      * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
576      */
577     private static class DelayedWorkQueue
578         extends AbstractCollection<Runnable>
579         implements BlockingQueue<Runnable> {
580
581         private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
582         public Runnable poll() { return dq.poll(); }
583         public Runnable peek() { return dq.peek(); }
584         public Runnable take() throws InterruptedException { return dq.take(); }
585         public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
586             return dq.poll(timeout, unit);
587         }
588
589         public boolean add(Runnable x) {
590             return dq.add((RunnableScheduledFuture)x);
591         }
592         public boolean offer(Runnable x) {
593             return dq.offer((RunnableScheduledFuture)x);
594         }
595         public void put(Runnable x) {
596             dq.put((RunnableScheduledFuture)x);
597         }
598         public boolean offer(Runnable x, long timeout, TimeUnit unit) {
599             return dq.offer((RunnableScheduledFuture)x, timeout, unit);
600         }
601
602         public Runnable remove() { return dq.remove(); }
603         public Runnable element() { return dq.element(); }
604         public void clear() { dq.clear(); }
605         public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
606         public int drainTo(Collection<? super Runnable> c, int maxElements) {
607             return dq.drainTo(c, maxElements);
608         }
609
610         public int remainingCapacity() { return dq.remainingCapacity(); }
611         public boolean remove(Object x) { return dq.remove(x); }
612         public boolean contains(Object x) { return dq.contains(x); }
613         public int size() { return dq.size(); }
614         public boolean isEmpty() { return dq.isEmpty(); }
615         public Object[] toArray() { return dq.toArray(); }
616         public <T> T[] toArray(T[] array) { return dq.toArray(array); }
617         public Iterator<Runnable> iterator() {
618             return new Iterator<Runnable>() {
619                 private Iterator<RunnableScheduledFuture> it = dq.iterator();
620                 public boolean hasNext() { return it.hasNext(); }
621                 public Runnable next() { return it.next(); }
622                 public void remove() { it.remove(); }
623             };
624         }
625     }
626 }