2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
20 * <p>This is a classic "bounded buffer", in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
44 * @param <E> the type of elements held in this collection
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
55 private static final long serialVersionUID = -817911632652898426L;
57 /** The queued items */
58 private final E[] items;
59 /** items index for next take, poll or remove */
60 private int takeIndex;
61 /** items index for next put, offer, or add. */
63 /** Number of items in the queue */
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
71 /** Main lock guarding all access */
72 private final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
78 // Internal helper methods
81 * Circularly increment i.
83 final int inc(int i) {
84 return (++i == items.length)? 0 : i;
88 * Inserts element at current put position, advances, and signals.
89 * Call only when holding lock.
91 private void insert(E x) {
93 putIndex = inc(putIndex);
99 * Extracts element at current take position, advances, and signals.
100 * Call only when holding lock.
102 private E extract() {
103 final E[] items = this.items;
104 E x = items[takeIndex];
105 items[takeIndex] = null;
106 takeIndex = inc(takeIndex);
113 * Utility for remove and iterator.remove: Delete item at position i.
114 * Call only when holding lock.
116 void removeAt(int i) {
117 final E[] items = this.items;
118 // if removing front item, just advance
119 if (i == takeIndex) {
120 items[takeIndex] = null;
121 takeIndex = inc(takeIndex);
123 // slide over all others up through putIndex.
126 if (nexti != putIndex) {
127 items[i] = items[nexti];
141 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 * capacity and default access policy.
144 * @param capacity the capacity of this queue
145 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
147 public ArrayBlockingQueue(int capacity) {
148 this(capacity, false);
152 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153 * capacity and the specified access policy.
155 * @param capacity the capacity of this queue
156 * @param fair if <tt>true</tt> then queue accesses for threads blocked
157 * on insertion or removal, are processed in FIFO order;
158 * if <tt>false</tt> the access order is unspecified.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
161 public ArrayBlockingQueue(int capacity, boolean fair) {
163 throw new IllegalArgumentException();
164 this.items = (E[]) new Object[capacity];
165 lock = new ReentrantLock(fair);
166 notEmpty = lock.newCondition();
167 notFull = lock.newCondition();
171 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172 * capacity, the specified access policy and initially containing the
173 * elements of the given collection,
174 * added in traversal order of the collection's iterator.
176 * @param capacity the capacity of this queue
177 * @param fair if <tt>true</tt> then queue accesses for threads blocked
178 * on insertion or removal, are processed in FIFO order;
179 * if <tt>false</tt> the access order is unspecified.
180 * @param c the collection of elements to initially contain
181 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182 * <tt>c.size()</tt>, or less than 1.
183 * @throws NullPointerException if the specified collection or any
184 * of its elements are null
186 public ArrayBlockingQueue(int capacity, boolean fair,
187 Collection<? extends E> c) {
188 this(capacity, fair);
189 if (capacity < c.size())
190 throw new IllegalArgumentException();
192 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
197 * Inserts the specified element at the tail of this queue if it is
198 * possible to do so immediately without exceeding the queue's capacity,
199 * returning <tt>true</tt> upon success and throwing an
200 * <tt>IllegalStateException</tt> if this queue is full.
202 * @param e the element to add
203 * @return <tt>true</tt> (as specified by {@link Collection#add})
204 * @throws IllegalStateException if this queue is full
205 * @throws NullPointerException if the specified element is null
207 public boolean add(E e) {
212 * Inserts the specified element at the tail of this queue if it is
213 * possible to do so immediately without exceeding the queue's capacity,
214 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215 * is full. This method is generally preferable to method {@link #add},
216 * which can fail to insert an element only by throwing an exception.
218 * @throws NullPointerException if the specified element is null
220 public boolean offer(E e) {
221 if (e == null) throw new NullPointerException();
222 final ReentrantLock lock = this.lock;
225 if (count == items.length)
237 * Inserts the specified element at the tail of this queue, waiting
238 * for space to become available if the queue is full.
240 * @throws InterruptedException {@inheritDoc}
241 * @throws NullPointerException {@inheritDoc}
243 public void put(E e) throws InterruptedException {
244 if (e == null) throw new NullPointerException();
245 final E[] items = this.items;
246 final ReentrantLock lock = this.lock;
247 lock.lockInterruptibly();
250 while (count == items.length)
252 } catch (InterruptedException ie) {
253 notFull.signal(); // propagate to non-interrupted thread
263 * Inserts the specified element at the tail of this queue, waiting
264 * up to the specified wait time for space to become available if
267 * @throws InterruptedException {@inheritDoc}
268 * @throws NullPointerException {@inheritDoc}
270 public boolean offer(E e, long timeout, TimeUnit unit)
271 throws InterruptedException {
273 if (e == null) throw new NullPointerException();
274 long nanos = unit.toNanos(timeout);
275 final ReentrantLock lock = this.lock;
276 lock.lockInterruptibly();
279 if (count != items.length) {
286 nanos = notFull.awaitNanos(nanos);
287 } catch (InterruptedException ie) {
288 notFull.signal(); // propagate to non-interrupted thread
298 final ReentrantLock lock = this.lock;
310 public E take() throws InterruptedException {
311 final ReentrantLock lock = this.lock;
312 lock.lockInterruptibly();
317 } catch (InterruptedException ie) {
318 notEmpty.signal(); // propagate to non-interrupted thread
328 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
329 long nanos = unit.toNanos(timeout);
330 final ReentrantLock lock = this.lock;
331 lock.lockInterruptibly();
341 nanos = notEmpty.awaitNanos(nanos);
342 } catch (InterruptedException ie) {
343 notEmpty.signal(); // propagate to non-interrupted thread
354 final ReentrantLock lock = this.lock;
357 return (count == 0) ? null : items[takeIndex];
363 // this doc comment is overridden to remove the reference to collections
364 // greater in size than Integer.MAX_VALUE
366 * Returns the number of elements in this queue.
368 * @return the number of elements in this queue
371 final ReentrantLock lock = this.lock;
380 // this doc comment is a modified copy of the inherited doc comment,
381 // without the reference to unlimited queues.
383 * Returns the number of additional elements that this queue can ideally
384 * (in the absence of memory or resource constraints) accept without
385 * blocking. This is always equal to the initial capacity of this queue
386 * less the current <tt>size</tt> of this queue.
388 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
389 * an element will succeed by inspecting <tt>remainingCapacity</tt>
390 * because it may be the case that another thread is about to
391 * insert or remove an element.
393 public int remainingCapacity() {
394 final ReentrantLock lock = this.lock;
397 return items.length - count;
404 * Removes a single instance of the specified element from this queue,
405 * if it is present. More formally, removes an element <tt>e</tt> such
406 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
408 * Returns <tt>true</tt> if this queue contained the specified element
409 * (or equivalently, if this queue changed as a result of the call).
411 * @param o element to be removed from this queue, if present
412 * @return <tt>true</tt> if this queue changed as a result of the call
414 public boolean remove(Object o) {
415 if (o == null) return false;
416 final E[] items = this.items;
417 final ReentrantLock lock = this.lock;
425 if (o.equals(items[i])) {
438 * Returns <tt>true</tt> if this queue contains the specified element.
439 * More formally, returns <tt>true</tt> if and only if this queue contains
440 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
442 * @param o object to be checked for containment in this queue
443 * @return <tt>true</tt> if this queue contains the specified element
445 public boolean contains(Object o) {
446 if (o == null) return false;
447 final E[] items = this.items;
448 final ReentrantLock lock = this.lock;
453 while (k++ < count) {
454 if (o.equals(items[i]))
465 * Returns an array containing all of the elements in this queue, in
468 * <p>The returned array will be "safe" in that no references to it are
469 * maintained by this queue. (In other words, this method must allocate
470 * a new array). The caller is thus free to modify the returned array.
472 * <p>This method acts as bridge between array-based and collection-based
475 * @return an array containing all of the elements in this queue
477 public Object[] toArray() {
478 final E[] items = this.items;
479 final ReentrantLock lock = this.lock;
482 Object[] a = new Object[count];
496 * Returns an array containing all of the elements in this queue, in
497 * proper sequence; the runtime type of the returned array is that of
498 * the specified array. If the queue fits in the specified array, it
499 * is returned therein. Otherwise, a new array is allocated with the
500 * runtime type of the specified array and the size of this queue.
502 * <p>If this queue fits in the specified array with room to spare
503 * (i.e., the array has more elements than this queue), the element in
504 * the array immediately following the end of the queue is set to
507 * <p>Like the {@link #toArray()} method, this method acts as bridge between
508 * array-based and collection-based APIs. Further, this method allows
509 * precise control over the runtime type of the output array, and may,
510 * under certain circumstances, be used to save allocation costs.
512 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
513 * The following code can be used to dump the queue into a newly
514 * allocated array of <tt>String</tt>:
517 * String[] y = x.toArray(new String[0]);</pre>
519 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
520 * <tt>toArray()</tt>.
522 * @param a the array into which the elements of the queue are to
523 * be stored, if it is big enough; otherwise, a new array of the
524 * same runtime type is allocated for this purpose
525 * @return an array containing all of the elements in this queue
526 * @throws ArrayStoreException if the runtime type of the specified array
527 * is not a supertype of the runtime type of every element in
529 * @throws NullPointerException if the specified array is null
531 public <T> T[] toArray(T[] a) {
532 final E[] items = this.items;
533 final ReentrantLock lock = this.lock;
536 if (a.length < count)
537 a = (T[])java.lang.reflect.Array.newInstance(
538 a.getClass().getComponentType(),
545 a[k++] = (T)items[i];
548 if (a.length > count)
556 public String toString() {
557 final ReentrantLock lock = this.lock;
560 return super.toString();
567 * Atomically removes all of the elements from this queue.
568 * The queue will be empty after this call returns.
570 public void clear() {
571 final E[] items = this.items;
572 final ReentrantLock lock = this.lock;
591 * @throws UnsupportedOperationException {@inheritDoc}
592 * @throws ClassCastException {@inheritDoc}
593 * @throws NullPointerException {@inheritDoc}
594 * @throws IllegalArgumentException {@inheritDoc}
596 public int drainTo(Collection<? super E> c) {
598 throw new NullPointerException();
600 throw new IllegalArgumentException();
601 final E[] items = this.items;
602 final ReentrantLock lock = this.lock;
627 * @throws UnsupportedOperationException {@inheritDoc}
628 * @throws ClassCastException {@inheritDoc}
629 * @throws NullPointerException {@inheritDoc}
630 * @throws IllegalArgumentException {@inheritDoc}
632 public int drainTo(Collection<? super E> c, int maxElements) {
634 throw new NullPointerException();
636 throw new IllegalArgumentException();
637 if (maxElements <= 0)
639 final E[] items = this.items;
640 final ReentrantLock lock = this.lock;
646 int max = (maxElements < count)? maxElements : count;
666 * Returns an iterator over the elements in this queue in proper sequence.
667 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
668 * will never throw {@link ConcurrentModificationException},
669 * and guarantees to traverse elements as they existed upon
670 * construction of the iterator, and may (but is not guaranteed to)
671 * reflect any modifications subsequent to construction.
673 * @return an iterator over the elements in this queue in proper sequence
675 public Iterator<E> iterator() {
676 final ReentrantLock lock = this.lock;
686 * Iterator for ArrayBlockingQueue
688 private class Itr implements Iterator<E> {
690 * Index of element to be returned by next,
691 * or a negative number if no such.
693 private int nextIndex;
696 * nextItem holds on to item fields because once we claim
697 * that an element exists in hasNext(), we must return it in
698 * the following next() call even if it was in the process of
699 * being removed when hasNext() was called.
704 * Index of element returned by most recent call to next.
705 * Reset to -1 if this element is deleted by a call to remove.
714 nextIndex = takeIndex;
715 nextItem = items[takeIndex];
719 public boolean hasNext() {
721 * No sync. We can return true by mistake here
722 * only if this iterator passed across threads,
723 * which we don't support anyway.
725 return nextIndex >= 0;
729 * Checks whether nextIndex is valid; if so setting nextItem.
730 * Stops iterator when either hits putIndex or sees null item.
732 private void checkNext() {
733 if (nextIndex == putIndex) {
737 nextItem = items[nextIndex];
738 if (nextItem == null)
744 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
748 throw new NoSuchElementException();
751 nextIndex = inc(nextIndex);
759 public void remove() {
760 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
765 throw new IllegalStateException();
770 // back up cursor (reset to front if was first element)
771 nextIndex = (i == ti) ? takeIndex : i;