OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / ArrayBlockingQueue.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13  * array.  This queue orders elements FIFO (first-in-first-out).  The
14  * <em>head</em> of the queue is that element that has been on the
15  * queue the longest time.  The <em>tail</em> of the queue is that
16  * element that has been on the queue the shortest time. New elements
17  * are inserted at the tail of the queue, and the queue retrieval
18  * operations obtain elements at the head of the queue.
19  *
20  * <p>This is a classic &quot;bounded buffer&quot;, in which a
21  * fixed-sized array holds elements inserted by producers and
22  * extracted by consumers.  Once created, the capacity cannot be
23  * increased.  Attempts to <tt>put</tt> an element into a full queue
24  * will result in the operation blocking; attempts to <tt>take</tt> an
25  * element from an empty queue will similarly block.
26  *
27  * <p> This class supports an optional fairness policy for ordering
28  * waiting producer and consumer threads.  By default, this ordering
29  * is not guaranteed. However, a queue constructed with fairness set
30  * to <tt>true</tt> grants threads access in FIFO order. Fairness
31  * generally decreases throughput but reduces variability and avoids
32  * starvation.
33  *
34  * <p>This class and its iterator implement all of the
35  * <em>optional</em> methods of the {@link Collection} and {@link
36  * Iterator} interfaces.
37  *
38  * <p>This class is a member of the
39  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40  * Java Collections Framework</a>.
41  *
42  * @since 1.5
43  * @author Doug Lea
44  * @param <E> the type of elements held in this collection
45  */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47         implements BlockingQueue<E>, java.io.Serializable {
48
49     /**
50      * Serialization ID. This class relies on default serialization
51      * even for the items array, which is default-serialized, even if
52      * it is empty. Otherwise it could not be declared final, which is
53      * necessary here.
54      */
55     private static final long serialVersionUID = -817911632652898426L;
56
57     /** The queued items  */
58     private final E[] items;
59     /** items index for next take, poll or remove */
60     private int takeIndex;
61     /** items index for next put, offer, or add. */
62     private int putIndex;
63     /** Number of items in the queue */
64     private int count;
65
66     /*
67      * Concurrency control uses the classic two-condition algorithm
68      * found in any textbook.
69      */
70
71     /** Main lock guarding all access */
72     private final ReentrantLock lock;
73     /** Condition for waiting takes */
74     private final Condition notEmpty;
75     /** Condition for waiting puts */
76     private final Condition notFull;
77
78     // Internal helper methods
79
80     /**
81      * Circularly increment i.
82      */
83     final int inc(int i) {
84         return (++i == items.length)? 0 : i;
85     }
86
87     /**
88      * Inserts element at current put position, advances, and signals.
89      * Call only when holding lock.
90      */
91     private void insert(E x) {
92         items[putIndex] = x;
93         putIndex = inc(putIndex);
94         ++count;
95         notEmpty.signal();
96     }
97
98     /**
99      * Extracts element at current take position, advances, and signals.
100      * Call only when holding lock.
101      */
102     private E extract() {
103         final E[] items = this.items;
104         E x = items[takeIndex];
105         items[takeIndex] = null;
106         takeIndex = inc(takeIndex);
107         --count;
108         notFull.signal();
109         return x;
110     }
111
112     /**
113      * Utility for remove and iterator.remove: Delete item at position i.
114      * Call only when holding lock.
115      */
116     void removeAt(int i) {
117         final E[] items = this.items;
118         // if removing front item, just advance
119         if (i == takeIndex) {
120             items[takeIndex] = null;
121             takeIndex = inc(takeIndex);
122         } else {
123             // slide over all others up through putIndex.
124             for (;;) {
125                 int nexti = inc(i);
126                 if (nexti != putIndex) {
127                     items[i] = items[nexti];
128                     i = nexti;
129                 } else {
130                     items[i] = null;
131                     putIndex = i;
132                     break;
133                 }
134             }
135         }
136         --count;
137         notFull.signal();
138     }
139
140     /**
141      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142      * capacity and default access policy.
143      *
144      * @param capacity the capacity of this queue
145      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146      */
147     public ArrayBlockingQueue(int capacity) {
148         this(capacity, false);
149     }
150
151     /**
152      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153      * capacity and the specified access policy.
154      *
155      * @param capacity the capacity of this queue
156      * @param fair if <tt>true</tt> then queue accesses for threads blocked
157      *        on insertion or removal, are processed in FIFO order;
158      *        if <tt>false</tt> the access order is unspecified.
159      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160      */
161     public ArrayBlockingQueue(int capacity, boolean fair) {
162         if (capacity <= 0)
163             throw new IllegalArgumentException();
164         this.items = (E[]) new Object[capacity];
165         lock = new ReentrantLock(fair);
166         notEmpty = lock.newCondition();
167         notFull =  lock.newCondition();
168     }
169
170     /**
171      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172      * capacity, the specified access policy and initially containing the
173      * elements of the given collection,
174      * added in traversal order of the collection's iterator.
175      *
176      * @param capacity the capacity of this queue
177      * @param fair if <tt>true</tt> then queue accesses for threads blocked
178      *        on insertion or removal, are processed in FIFO order;
179      *        if <tt>false</tt> the access order is unspecified.
180      * @param c the collection of elements to initially contain
181      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182      *         <tt>c.size()</tt>, or less than 1.
183      * @throws NullPointerException if the specified collection or any
184      *         of its elements are null
185      */
186     public ArrayBlockingQueue(int capacity, boolean fair,
187                               Collection<? extends E> c) {
188         this(capacity, fair);
189         if (capacity < c.size())
190             throw new IllegalArgumentException();
191
192         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
193             add(it.next());
194     }
195
196     /**
197      * Inserts the specified element at the tail of this queue if it is
198      * possible to do so immediately without exceeding the queue's capacity,
199      * returning <tt>true</tt> upon success and throwing an
200      * <tt>IllegalStateException</tt> if this queue is full.
201      *
202      * @param e the element to add
203      * @return <tt>true</tt> (as specified by {@link Collection#add})
204      * @throws IllegalStateException if this queue is full
205      * @throws NullPointerException if the specified element is null
206      */
207     public boolean add(E e) {
208         return super.add(e);
209     }
210
211     /**
212      * Inserts the specified element at the tail of this queue if it is
213      * possible to do so immediately without exceeding the queue's capacity,
214      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215      * is full.  This method is generally preferable to method {@link #add},
216      * which can fail to insert an element only by throwing an exception.
217      *
218      * @throws NullPointerException if the specified element is null
219      */
220     public boolean offer(E e) {
221         if (e == null) throw new NullPointerException();
222         final ReentrantLock lock = this.lock;
223         lock.lock();
224         try {
225             if (count == items.length)
226                 return false;
227             else {
228                 insert(e);
229                 return true;
230             }
231         } finally {
232             lock.unlock();
233         }
234     }
235
236     /**
237      * Inserts the specified element at the tail of this queue, waiting
238      * for space to become available if the queue is full.
239      *
240      * @throws InterruptedException {@inheritDoc}
241      * @throws NullPointerException {@inheritDoc}
242      */
243     public void put(E e) throws InterruptedException {
244         if (e == null) throw new NullPointerException();
245         final E[] items = this.items;
246         final ReentrantLock lock = this.lock;
247         lock.lockInterruptibly();
248         try {
249             try {
250                 while (count == items.length)
251                     notFull.await();
252             } catch (InterruptedException ie) {
253                 notFull.signal(); // propagate to non-interrupted thread
254                 throw ie;
255             }
256             insert(e);
257         } finally {
258             lock.unlock();
259         }
260     }
261
262     /**
263      * Inserts the specified element at the tail of this queue, waiting
264      * up to the specified wait time for space to become available if
265      * the queue is full.
266      *
267      * @throws InterruptedException {@inheritDoc}
268      * @throws NullPointerException {@inheritDoc}
269      */
270     public boolean offer(E e, long timeout, TimeUnit unit)
271         throws InterruptedException {
272
273         if (e == null) throw new NullPointerException();
274         long nanos = unit.toNanos(timeout);
275         final ReentrantLock lock = this.lock;
276         lock.lockInterruptibly();
277         try {
278             for (;;) {
279                 if (count != items.length) {
280                     insert(e);
281                     return true;
282                 }
283                 if (nanos <= 0)
284                     return false;
285                 try {
286                     nanos = notFull.awaitNanos(nanos);
287                 } catch (InterruptedException ie) {
288                     notFull.signal(); // propagate to non-interrupted thread
289                     throw ie;
290                 }
291             }
292         } finally {
293             lock.unlock();
294         }
295     }
296
297     public E poll() {
298         final ReentrantLock lock = this.lock;
299         lock.lock();
300         try {
301             if (count == 0)
302                 return null;
303             E x = extract();
304             return x;
305         } finally {
306             lock.unlock();
307         }
308     }
309
310     public E take() throws InterruptedException {
311         final ReentrantLock lock = this.lock;
312         lock.lockInterruptibly();
313         try {
314             try {
315                 while (count == 0)
316                     notEmpty.await();
317             } catch (InterruptedException ie) {
318                 notEmpty.signal(); // propagate to non-interrupted thread
319                 throw ie;
320             }
321             E x = extract();
322             return x;
323         } finally {
324             lock.unlock();
325         }
326     }
327
328     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
329         long nanos = unit.toNanos(timeout);
330         final ReentrantLock lock = this.lock;
331         lock.lockInterruptibly();
332         try {
333             for (;;) {
334                 if (count != 0) {
335                     E x = extract();
336                     return x;
337                 }
338                 if (nanos <= 0)
339                     return null;
340                 try {
341                     nanos = notEmpty.awaitNanos(nanos);
342                 } catch (InterruptedException ie) {
343                     notEmpty.signal(); // propagate to non-interrupted thread
344                     throw ie;
345                 }
346
347             }
348         } finally {
349             lock.unlock();
350         }
351     }
352
353     public E peek() {
354         final ReentrantLock lock = this.lock;
355         lock.lock();
356         try {
357             return (count == 0) ? null : items[takeIndex];
358         } finally {
359             lock.unlock();
360         }
361     }
362
363     // this doc comment is overridden to remove the reference to collections
364     // greater in size than Integer.MAX_VALUE
365     /**
366      * Returns the number of elements in this queue.
367      *
368      * @return the number of elements in this queue
369      */
370     public int size() {
371         final ReentrantLock lock = this.lock;
372         lock.lock();
373         try {
374             return count;
375         } finally {
376             lock.unlock();
377         }
378     }
379
380     // this doc comment is a modified copy of the inherited doc comment,
381     // without the reference to unlimited queues.
382     /**
383      * Returns the number of additional elements that this queue can ideally
384      * (in the absence of memory or resource constraints) accept without
385      * blocking. This is always equal to the initial capacity of this queue
386      * less the current <tt>size</tt> of this queue.
387      *
388      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
389      * an element will succeed by inspecting <tt>remainingCapacity</tt>
390      * because it may be the case that another thread is about to
391      * insert or remove an element.
392      */
393     public int remainingCapacity() {
394         final ReentrantLock lock = this.lock;
395         lock.lock();
396         try {
397             return items.length - count;
398         } finally {
399             lock.unlock();
400         }
401     }
402
403     /**
404      * Removes a single instance of the specified element from this queue,
405      * if it is present.  More formally, removes an element <tt>e</tt> such
406      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
407      * elements.
408      * Returns <tt>true</tt> if this queue contained the specified element
409      * (or equivalently, if this queue changed as a result of the call).
410      *
411      * @param o element to be removed from this queue, if present
412      * @return <tt>true</tt> if this queue changed as a result of the call
413      */
414     public boolean remove(Object o) {
415         if (o == null) return false;
416         final E[] items = this.items;
417         final ReentrantLock lock = this.lock;
418         lock.lock();
419         try {
420             int i = takeIndex;
421             int k = 0;
422             for (;;) {
423                 if (k++ >= count)
424                     return false;
425                 if (o.equals(items[i])) {
426                     removeAt(i);
427                     return true;
428                 }
429                 i = inc(i);
430             }
431
432         } finally {
433             lock.unlock();
434         }
435     }
436
437     /**
438      * Returns <tt>true</tt> if this queue contains the specified element.
439      * More formally, returns <tt>true</tt> if and only if this queue contains
440      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
441      *
442      * @param o object to be checked for containment in this queue
443      * @return <tt>true</tt> if this queue contains the specified element
444      */
445     public boolean contains(Object o) {
446         if (o == null) return false;
447         final E[] items = this.items;
448         final ReentrantLock lock = this.lock;
449         lock.lock();
450         try {
451             int i = takeIndex;
452             int k = 0;
453             while (k++ < count) {
454                 if (o.equals(items[i]))
455                     return true;
456                 i = inc(i);
457             }
458             return false;
459         } finally {
460             lock.unlock();
461         }
462     }
463
464     /**
465      * Returns an array containing all of the elements in this queue, in
466      * proper sequence.
467      *
468      * <p>The returned array will be "safe" in that no references to it are
469      * maintained by this queue.  (In other words, this method must allocate
470      * a new array).  The caller is thus free to modify the returned array.
471      *
472      * <p>This method acts as bridge between array-based and collection-based
473      * APIs.
474      *
475      * @return an array containing all of the elements in this queue
476      */
477     public Object[] toArray() {
478         final E[] items = this.items;
479         final ReentrantLock lock = this.lock;
480         lock.lock();
481         try {
482             Object[] a = new Object[count];
483             int k = 0;
484             int i = takeIndex;
485             while (k < count) {
486                 a[k++] = items[i];
487                 i = inc(i);
488             }
489             return a;
490         } finally {
491             lock.unlock();
492         }
493     }
494
495     /**
496      * Returns an array containing all of the elements in this queue, in
497      * proper sequence; the runtime type of the returned array is that of
498      * the specified array.  If the queue fits in the specified array, it
499      * is returned therein.  Otherwise, a new array is allocated with the
500      * runtime type of the specified array and the size of this queue.
501      *
502      * <p>If this queue fits in the specified array with room to spare
503      * (i.e., the array has more elements than this queue), the element in
504      * the array immediately following the end of the queue is set to
505      * <tt>null</tt>.
506      *
507      * <p>Like the {@link #toArray()} method, this method acts as bridge between
508      * array-based and collection-based APIs.  Further, this method allows
509      * precise control over the runtime type of the output array, and may,
510      * under certain circumstances, be used to save allocation costs.
511      *
512      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
513      * The following code can be used to dump the queue into a newly
514      * allocated array of <tt>String</tt>:
515      *
516      * <pre>
517      *     String[] y = x.toArray(new String[0]);</pre>
518      *
519      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
520      * <tt>toArray()</tt>.
521      *
522      * @param a the array into which the elements of the queue are to
523      *          be stored, if it is big enough; otherwise, a new array of the
524      *          same runtime type is allocated for this purpose
525      * @return an array containing all of the elements in this queue
526      * @throws ArrayStoreException if the runtime type of the specified array
527      *         is not a supertype of the runtime type of every element in
528      *         this queue
529      * @throws NullPointerException if the specified array is null
530      */
531     public <T> T[] toArray(T[] a) {
532         final E[] items = this.items;
533         final ReentrantLock lock = this.lock;
534         lock.lock();
535         try {
536             if (a.length < count)
537                 a = (T[])java.lang.reflect.Array.newInstance(
538                     a.getClass().getComponentType(),
539                     count
540                     );
541
542             int k = 0;
543             int i = takeIndex;
544             while (k < count) {
545                 a[k++] = (T)items[i];
546                 i = inc(i);
547             }
548             if (a.length > count)
549                 a[count] = null;
550             return a;
551         } finally {
552             lock.unlock();
553         }
554     }
555
556     public String toString() {
557         final ReentrantLock lock = this.lock;
558         lock.lock();
559         try {
560             return super.toString();
561         } finally {
562             lock.unlock();
563         }
564     }
565
566     /**
567      * Atomically removes all of the elements from this queue.
568      * The queue will be empty after this call returns.
569      */
570     public void clear() {
571         final E[] items = this.items;
572         final ReentrantLock lock = this.lock;
573         lock.lock();
574         try {
575             int i = takeIndex;
576             int k = count;
577             while (k-- > 0) {
578                 items[i] = null;
579                 i = inc(i);
580             }
581             count = 0;
582             putIndex = 0;
583             takeIndex = 0;
584             notFull.signalAll();
585         } finally {
586             lock.unlock();
587         }
588     }
589
590     /**
591      * @throws UnsupportedOperationException {@inheritDoc}
592      * @throws ClassCastException            {@inheritDoc}
593      * @throws NullPointerException          {@inheritDoc}
594      * @throws IllegalArgumentException      {@inheritDoc}
595      */
596     public int drainTo(Collection<? super E> c) {
597         if (c == null)
598             throw new NullPointerException();
599         if (c == this)
600             throw new IllegalArgumentException();
601         final E[] items = this.items;
602         final ReentrantLock lock = this.lock;
603         lock.lock();
604         try {
605             int i = takeIndex;
606             int n = 0;
607             int max = count;
608             while (n < max) {
609                 c.add(items[i]);
610                 items[i] = null;
611                 i = inc(i);
612                 ++n;
613             }
614             if (n > 0) {
615                 count = 0;
616                 putIndex = 0;
617                 takeIndex = 0;
618                 notFull.signalAll();
619             }
620             return n;
621         } finally {
622             lock.unlock();
623         }
624     }
625
626     /**
627      * @throws UnsupportedOperationException {@inheritDoc}
628      * @throws ClassCastException            {@inheritDoc}
629      * @throws NullPointerException          {@inheritDoc}
630      * @throws IllegalArgumentException      {@inheritDoc}
631      */
632     public int drainTo(Collection<? super E> c, int maxElements) {
633         if (c == null)
634             throw new NullPointerException();
635         if (c == this)
636             throw new IllegalArgumentException();
637         if (maxElements <= 0)
638             return 0;
639         final E[] items = this.items;
640         final ReentrantLock lock = this.lock;
641         lock.lock();
642         try {
643             int i = takeIndex;
644             int n = 0;
645             int sz = count;
646             int max = (maxElements < count)? maxElements : count;
647             while (n < max) {
648                 c.add(items[i]);
649                 items[i] = null;
650                 i = inc(i);
651                 ++n;
652             }
653             if (n > 0) {
654                 count -= n;
655                 takeIndex = i;
656                 notFull.signalAll();
657             }
658             return n;
659         } finally {
660             lock.unlock();
661         }
662     }
663
664
665     /**
666      * Returns an iterator over the elements in this queue in proper sequence.
667      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
668      * will never throw {@link ConcurrentModificationException},
669      * and guarantees to traverse elements as they existed upon
670      * construction of the iterator, and may (but is not guaranteed to)
671      * reflect any modifications subsequent to construction.
672      *
673      * @return an iterator over the elements in this queue in proper sequence
674      */
675     public Iterator<E> iterator() {
676         final ReentrantLock lock = this.lock;
677         lock.lock();
678         try {
679             return new Itr();
680         } finally {
681             lock.unlock();
682         }
683     }
684
685     /**
686      * Iterator for ArrayBlockingQueue
687      */
688     private class Itr implements Iterator<E> {
689         /**
690          * Index of element to be returned by next,
691          * or a negative number if no such.
692          */
693         private int nextIndex;
694
695         /**
696          * nextItem holds on to item fields because once we claim
697          * that an element exists in hasNext(), we must return it in
698          * the following next() call even if it was in the process of
699          * being removed when hasNext() was called.
700          */
701         private E nextItem;
702
703         /**
704          * Index of element returned by most recent call to next.
705          * Reset to -1 if this element is deleted by a call to remove.
706          */
707         private int lastRet;
708
709         Itr() {
710             lastRet = -1;
711             if (count == 0)
712                 nextIndex = -1;
713             else {
714                 nextIndex = takeIndex;
715                 nextItem = items[takeIndex];
716             }
717         }
718
719         public boolean hasNext() {
720             /*
721              * No sync. We can return true by mistake here
722              * only if this iterator passed across threads,
723              * which we don't support anyway.
724              */
725             return nextIndex >= 0;
726         }
727
728         /**
729          * Checks whether nextIndex is valid; if so setting nextItem.
730          * Stops iterator when either hits putIndex or sees null item.
731          */
732         private void checkNext() {
733             if (nextIndex == putIndex) {
734                 nextIndex = -1;
735                 nextItem = null;
736             } else {
737                 nextItem = items[nextIndex];
738                 if (nextItem == null)
739                     nextIndex = -1;
740             }
741         }
742
743         public E next() {
744             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
745             lock.lock();
746             try {
747                 if (nextIndex < 0)
748                     throw new NoSuchElementException();
749                 lastRet = nextIndex;
750                 E x = nextItem;
751                 nextIndex = inc(nextIndex);
752                 checkNext();
753                 return x;
754             } finally {
755                 lock.unlock();
756             }
757         }
758
759         public void remove() {
760             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
761             lock.lock();
762             try {
763                 int i = lastRet;
764                 if (i == -1)
765                     throw new IllegalStateException();
766                 lastRet = -1;
767
768                 int ti = takeIndex;
769                 removeAt(i);
770                 // back up cursor (reset to front if was first element)
771                 nextIndex = (i == ti) ? takeIndex : i;
772                 checkNext();
773             } finally {
774                 lock.unlock();
775             }
776         }
777     }
778 }