OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / DelayQueue.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13  * An unbounded {@linkplain BlockingQueue blocking queue} of
14  * <tt>Delayed</tt> elements, in which an element can only be taken
15  * when its delay has expired.  The <em>head</em> of the queue is that
16  * <tt>Delayed</tt> element whose delay expired furthest in the
17  * past.  If no delay has expired there is no head and <tt>poll</tt>
18  * will return <tt>null</tt>. Expiration occurs when an element's
19  * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20  * than or equal to zero.  Even though unexpired elements cannot be
21  * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
22  * treated as normal elements. For example, the <tt>size</tt> method
23  * returns the count of both expired and unexpired elements.
24  * This queue does not permit null elements.
25  *
26  * <p>This class and its iterator implement all of the
27  * <em>optional</em> methods of the {@link Collection} and {@link
28  * Iterator} interfaces.
29  *
30  * <p>This class is a member of the
31  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
32  * Java Collections Framework</a>.
33  *
34  * @since 1.5
35  * @author Doug Lea
36  * @param <E> the type of elements held in this collection
37  */
38
39 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
40     implements BlockingQueue<E> {
41
42     private transient final ReentrantLock lock = new ReentrantLock();
43     private transient final Condition available = lock.newCondition();
44     private final PriorityQueue<E> q = new PriorityQueue<E>();
45
46     /**
47      * Creates a new <tt>DelayQueue</tt> that is initially empty.
48      */
49     public DelayQueue() {}
50
51     /**
52      * Creates a <tt>DelayQueue</tt> initially containing the elements of the
53      * given collection of {@link Delayed} instances.
54      *
55      * @param c the collection of elements to initially contain
56      * @throws NullPointerException if the specified collection or any
57      *         of its elements are null
58      */
59     public DelayQueue(Collection<? extends E> c) {
60         this.addAll(c);
61     }
62
63     /**
64      * Inserts the specified element into this delay queue.
65      *
66      * @param e the element to add
67      * @return <tt>true</tt> (as specified by {@link Collection#add})
68      * @throws NullPointerException if the specified element is null
69      */
70     public boolean add(E e) {
71         return offer(e);
72     }
73
74     /**
75      * Inserts the specified element into this delay queue.
76      *
77      * @param e the element to add
78      * @return <tt>true</tt>
79      * @throws NullPointerException if the specified element is null
80      */
81     public boolean offer(E e) {
82         final ReentrantLock lock = this.lock;
83         lock.lock();
84         try {
85             E first = q.peek();
86             q.offer(e);
87             if (first == null || e.compareTo(first) < 0)
88                 available.signalAll();
89             return true;
90         } finally {
91             lock.unlock();
92         }
93     }
94
95     /**
96      * Inserts the specified element into this delay queue. As the queue is
97      * unbounded this method will never block.
98      *
99      * @param e the element to add
100      * @throws NullPointerException {@inheritDoc}
101      */
102     public void put(E e) {
103         offer(e);
104     }
105
106     /**
107      * Inserts the specified element into this delay queue. As the queue is
108      * unbounded this method will never block.
109      *
110      * @param e the element to add
111      * @param timeout This parameter is ignored as the method never blocks
112      * @param unit This parameter is ignored as the method never blocks
113      * @return <tt>true</tt>
114      * @throws NullPointerException {@inheritDoc}
115      */
116     public boolean offer(E e, long timeout, TimeUnit unit) {
117         return offer(e);
118     }
119
120     /**
121      * Retrieves and removes the head of this queue, or returns <tt>null</tt>
122      * if this queue has no elements with an expired delay.
123      *
124      * @return the head of this queue, or <tt>null</tt> if this
125      *         queue has no elements with an expired delay
126      */
127     public E poll() {
128         final ReentrantLock lock = this.lock;
129         lock.lock();
130         try {
131             E first = q.peek();
132             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
133                 return null;
134             else {
135                 E x = q.poll();
136                 assert x != null;
137                 if (q.size() != 0)
138                     available.signalAll();
139                 return x;
140             }
141         } finally {
142             lock.unlock();
143         }
144     }
145
146     /**
147      * Retrieves and removes the head of this queue, waiting if necessary
148      * until an element with an expired delay is available on this queue.
149      *
150      * @return the head of this queue
151      * @throws InterruptedException {@inheritDoc}
152      */
153     public E take() throws InterruptedException {
154         final ReentrantLock lock = this.lock;
155         lock.lockInterruptibly();
156         try {
157             for (;;) {
158                 E first = q.peek();
159                 if (first == null) {
160                     available.await();
161                 } else {
162                     long delay =  first.getDelay(TimeUnit.NANOSECONDS);
163                     if (delay > 0) {
164                         long tl = available.awaitNanos(delay);
165                     } else {
166                         E x = q.poll();
167                         assert x != null;
168                         if (q.size() != 0)
169                             available.signalAll(); // wake up other takers
170                         return x;
171
172                     }
173                 }
174             }
175         } finally {
176             lock.unlock();
177         }
178     }
179
180     /**
181      * Retrieves and removes the head of this queue, waiting if necessary
182      * until an element with an expired delay is available on this queue,
183      * or the specified wait time expires.
184      *
185      * @return the head of this queue, or <tt>null</tt> if the
186      *         specified waiting time elapses before an element with
187      *         an expired delay becomes available
188      * @throws InterruptedException {@inheritDoc}
189      */
190     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
191         long nanos = unit.toNanos(timeout);
192         final ReentrantLock lock = this.lock;
193         lock.lockInterruptibly();
194         try {
195             for (;;) {
196                 E first = q.peek();
197                 if (first == null) {
198                     if (nanos <= 0)
199                         return null;
200                     else
201                         nanos = available.awaitNanos(nanos);
202                 } else {
203                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
204                     if (delay > 0) {
205                         if (nanos <= 0)
206                             return null;
207                         if (delay > nanos)
208                             delay = nanos;
209                         long timeLeft = available.awaitNanos(delay);
210                         nanos -= delay - timeLeft;
211                     } else {
212                         E x = q.poll();
213                         assert x != null;
214                         if (q.size() != 0)
215                             available.signalAll();
216                         return x;
217                     }
218                 }
219             }
220         } finally {
221             lock.unlock();
222         }
223     }
224
225     /**
226      * Retrieves, but does not remove, the head of this queue, or
227      * returns <tt>null</tt> if this queue is empty.  Unlike
228      * <tt>poll</tt>, if no expired elements are available in the queue,
229      * this method returns the element that will expire next,
230      * if one exists.
231      *
232      * @return the head of this queue, or <tt>null</tt> if this
233      *         queue is empty.
234      */
235     public E peek() {
236         final ReentrantLock lock = this.lock;
237         lock.lock();
238         try {
239             return q.peek();
240         } finally {
241             lock.unlock();
242         }
243     }
244
245     public int size() {
246         final ReentrantLock lock = this.lock;
247         lock.lock();
248         try {
249             return q.size();
250         } finally {
251             lock.unlock();
252         }
253     }
254
255     /**
256      * @throws UnsupportedOperationException {@inheritDoc}
257      * @throws ClassCastException            {@inheritDoc}
258      * @throws NullPointerException          {@inheritDoc}
259      * @throws IllegalArgumentException      {@inheritDoc}
260      */
261     public int drainTo(Collection<? super E> c) {
262         if (c == null)
263             throw new NullPointerException();
264         if (c == this)
265             throw new IllegalArgumentException();
266         final ReentrantLock lock = this.lock;
267         lock.lock();
268         try {
269             int n = 0;
270             for (;;) {
271                 E first = q.peek();
272                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
273                     break;
274                 c.add(q.poll());
275                 ++n;
276             }
277             if (n > 0)
278                 available.signalAll();
279             return n;
280         } finally {
281             lock.unlock();
282         }
283     }
284
285     /**
286      * @throws UnsupportedOperationException {@inheritDoc}
287      * @throws ClassCastException            {@inheritDoc}
288      * @throws NullPointerException          {@inheritDoc}
289      * @throws IllegalArgumentException      {@inheritDoc}
290      */
291     public int drainTo(Collection<? super E> c, int maxElements) {
292         if (c == null)
293             throw new NullPointerException();
294         if (c == this)
295             throw new IllegalArgumentException();
296         if (maxElements <= 0)
297             return 0;
298         final ReentrantLock lock = this.lock;
299         lock.lock();
300         try {
301             int n = 0;
302             while (n < maxElements) {
303                 E first = q.peek();
304                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
305                     break;
306                 c.add(q.poll());
307                 ++n;
308             }
309             if (n > 0)
310                 available.signalAll();
311             return n;
312         } finally {
313             lock.unlock();
314         }
315     }
316
317     /**
318      * Atomically removes all of the elements from this delay queue.
319      * The queue will be empty after this call returns.
320      * Elements with an unexpired delay are not waited for; they are
321      * simply discarded from the queue.
322      */
323     public void clear() {
324         final ReentrantLock lock = this.lock;
325         lock.lock();
326         try {
327             q.clear();
328         } finally {
329             lock.unlock();
330         }
331     }
332
333     /**
334      * Always returns <tt>Integer.MAX_VALUE</tt> because
335      * a <tt>DelayQueue</tt> is not capacity constrained.
336      *
337      * @return <tt>Integer.MAX_VALUE</tt>
338      */
339     public int remainingCapacity() {
340         return Integer.MAX_VALUE;
341     }
342
343     /**
344      * Returns an array containing all of the elements in this queue.
345      * The returned array elements are in no particular order.
346      *
347      * <p>The returned array will be "safe" in that no references to it are
348      * maintained by this queue.  (In other words, this method must allocate
349      * a new array).  The caller is thus free to modify the returned array.
350      *
351      * <p>This method acts as bridge between array-based and collection-based
352      * APIs.
353      *
354      * @return an array containing all of the elements in this queue
355      */
356     public Object[] toArray() {
357         final ReentrantLock lock = this.lock;
358         lock.lock();
359         try {
360             return q.toArray();
361         } finally {
362             lock.unlock();
363         }
364     }
365
366     /**
367      * Returns an array containing all of the elements in this queue; the
368      * runtime type of the returned array is that of the specified array.
369      * The returned array elements are in no particular order.
370      * If the queue fits in the specified array, it is returned therein.
371      * Otherwise, a new array is allocated with the runtime type of the
372      * specified array and the size of this queue.
373      *
374      * <p>If this queue fits in the specified array with room to spare
375      * (i.e., the array has more elements than this queue), the element in
376      * the array immediately following the end of the queue is set to
377      * <tt>null</tt>.
378      *
379      * <p>Like the {@link #toArray()} method, this method acts as bridge between
380      * array-based and collection-based APIs.  Further, this method allows
381      * precise control over the runtime type of the output array, and may,
382      * under certain circumstances, be used to save allocation costs.
383      *
384      * <p>The following code can be used to dump a delay queue into a newly
385      * allocated array of <tt>Delayed</tt>:
386      *
387      * <pre>
388      *     Delayed[] a = q.toArray(new Delayed[0]);</pre>
389      *
390      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
391      * <tt>toArray()</tt>.
392      *
393      * @param a the array into which the elements of the queue are to
394      *          be stored, if it is big enough; otherwise, a new array of the
395      *          same runtime type is allocated for this purpose
396      * @return an array containing all of the elements in this queue
397      * @throws ArrayStoreException if the runtime type of the specified array
398      *         is not a supertype of the runtime type of every element in
399      *         this queue
400      * @throws NullPointerException if the specified array is null
401      */
402     public <T> T[] toArray(T[] a) {
403         final ReentrantLock lock = this.lock;
404         lock.lock();
405         try {
406             return q.toArray(a);
407         } finally {
408             lock.unlock();
409         }
410     }
411
412     /**
413      * Removes a single instance of the specified element from this
414      * queue, if it is present, whether or not it has expired.
415      */
416     public boolean remove(Object o) {
417         final ReentrantLock lock = this.lock;
418         lock.lock();
419         try {
420             return q.remove(o);
421         } finally {
422             lock.unlock();
423         }
424     }
425
426     /**
427      * Returns an iterator over all the elements (both expired and
428      * unexpired) in this queue. The iterator does not return the
429      * elements in any particular order.  The returned
430      * <tt>Iterator</tt> is a "weakly consistent" iterator that will
431      * never throw {@link ConcurrentModificationException}, and
432      * guarantees to traverse elements as they existed upon
433      * construction of the iterator, and may (but is not guaranteed
434      * to) reflect any modifications subsequent to construction.
435      *
436      * @return an iterator over the elements in this queue
437      */
438     public Iterator<E> iterator() {
439         return new Itr(toArray());
440     }
441
442     /**
443      * Snapshot iterator that works off copy of underlying q array.
444      */
445     private class Itr implements Iterator<E> {
446         final Object[] array; // Array of all elements
447         int cursor;           // index of next element to return;
448         int lastRet;          // index of last element, or -1 if no such
449
450         Itr(Object[] array) {
451             lastRet = -1;
452             this.array = array;
453         }
454
455         public boolean hasNext() {
456             return cursor < array.length;
457         }
458
459         public E next() {
460             if (cursor >= array.length)
461                 throw new NoSuchElementException();
462             lastRet = cursor;
463             return (E)array[cursor++];
464         }
465
466         public void remove() {
467             if (lastRet < 0)
468                 throw new IllegalStateException();
469             Object x = array[lastRet];
470             lastRet = -1;
471             // Traverse underlying queue to find == element,
472             // not just a .equals element.
473             lock.lock();
474             try {
475                 for (Iterator it = q.iterator(); it.hasNext(); ) {
476                     if (it.next() == x) {
477                         it.remove();
478                         return;
479                     }
480                 }
481             } finally {
482                 lock.unlock();
483             }
484         }
485     }
486
487 }