2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.ConcurrentModificationException;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
18 // removed link to collections framework docs
22 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23 * array. This queue orders elements FIFO (first-in-first-out). The
24 * <em>head</em> of the queue is that element that has been on the
25 * queue the longest time. The <em>tail</em> of the queue is that
26 * element that has been on the queue the shortest time. New elements
27 * are inserted at the tail of the queue, and the queue retrieval
28 * operations obtain elements at the head of the queue.
30 * <p>This is a classic "bounded buffer", in which a
31 * fixed-sized array holds elements inserted by producers and
32 * extracted by consumers. Once created, the capacity cannot be
33 * increased. Attempts to <tt>put</tt> an element into a full queue
34 * will result in the operation blocking; attempts to <tt>take</tt> an
35 * element from an empty queue will similarly block.
37 * <p> This class supports an optional fairness policy for ordering
38 * waiting producer and consumer threads. By default, this ordering
39 * is not guaranteed. However, a queue constructed with fairness set
40 * to <tt>true</tt> grants threads access in FIFO order. Fairness
41 * generally decreases throughput but reduces variability and avoids
44 * <p>This class and its iterator implement all of the
45 * <em>optional</em> methods of the {@link Collection} and {@link
46 * Iterator} interfaces.
50 * @param <E> the type of elements held in this collection
52 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
56 * Serialization ID. This class relies on default serialization
57 * even for the items array, which is default-serialized, even if
58 * it is empty. Otherwise it could not be declared final, which is
61 private static final long serialVersionUID = -817911632652898426L;
63 /** The queued items */
64 private final E[] items;
65 /** items index for next take, poll or remove */
66 private int takeIndex;
67 /** items index for next put, offer, or add. */
69 /** Number of items in the queue */
73 * Concurrency control uses the classic two-condition algorithm
74 * found in any textbook.
77 /** Main lock guarding all access */
78 private final ReentrantLock lock;
79 /** Condition for waiting takes */
80 private final Condition notEmpty;
81 /** Condition for waiting puts */
82 private final Condition notFull;
84 // Internal helper methods
87 * Circularly increment i.
89 final int inc(int i) {
90 return (++i == items.length)? 0 : i;
94 * Inserts element at current put position, advances, and signals.
95 * Call only when holding lock.
97 private void insert(E x) {
99 putIndex = inc(putIndex);
105 * Extracts element at current take position, advances, and signals.
106 * Call only when holding lock.
108 private E extract() {
109 final E[] items = this.items;
110 E x = items[takeIndex];
111 items[takeIndex] = null;
112 takeIndex = inc(takeIndex);
119 * Utility for remove and iterator.remove: Delete item at position i.
120 * Call only when holding lock.
122 void removeAt(int i) {
123 final E[] items = this.items;
124 // if removing front item, just advance
125 if (i == takeIndex) {
126 items[takeIndex] = null;
127 takeIndex = inc(takeIndex);
129 // slide over all others up through putIndex.
132 if (nexti != putIndex) {
133 items[i] = items[nexti];
147 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
148 * capacity and default access policy.
150 * @param capacity the capacity of this queue
151 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
153 public ArrayBlockingQueue(int capacity) {
154 this(capacity, false);
158 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
159 * capacity and the specified access policy.
161 * @param capacity the capacity of this queue
162 * @param fair if <tt>true</tt> then queue accesses for threads blocked
163 * on insertion or removal, are processed in FIFO order;
164 * if <tt>false</tt> the access order is unspecified.
165 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
167 public ArrayBlockingQueue(int capacity, boolean fair) {
169 throw new IllegalArgumentException();
170 this.items = (E[]) new Object[capacity];
171 lock = new ReentrantLock(fair);
172 notEmpty = lock.newCondition();
173 notFull = lock.newCondition();
177 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
178 * capacity, the specified access policy and initially containing the
179 * elements of the given collection,
180 * added in traversal order of the collection's iterator.
182 * @param capacity the capacity of this queue
183 * @param fair if <tt>true</tt> then queue accesses for threads blocked
184 * on insertion or removal, are processed in FIFO order;
185 * if <tt>false</tt> the access order is unspecified.
186 * @param c the collection of elements to initially contain
187 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
188 * <tt>c.size()</tt>, or less than 1.
189 * @throws NullPointerException if the specified collection or any
190 * of its elements are null
192 public ArrayBlockingQueue(int capacity, boolean fair,
193 Collection<? extends E> c) {
194 this(capacity, fair);
195 if (capacity < c.size())
196 throw new IllegalArgumentException();
203 * Inserts the specified element at the tail of this queue if it is
204 * possible to do so immediately without exceeding the queue's capacity,
205 * returning <tt>true</tt> upon success and throwing an
206 * <tt>IllegalStateException</tt> if this queue is full.
208 * @param e the element to add
209 * @return <tt>true</tt> (as specified by {@link Collection#add})
210 * @throws IllegalStateException if this queue is full
211 * @throws NullPointerException if the specified element is null
213 public boolean add(E e) {
218 * Inserts the specified element at the tail of this queue if it is
219 * possible to do so immediately without exceeding the queue's capacity,
220 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
221 * is full. This method is generally preferable to method {@link #add},
222 * which can fail to insert an element only by throwing an exception.
224 * @throws NullPointerException if the specified element is null
226 public boolean offer(E e) {
227 if (e == null) throw new NullPointerException();
228 final ReentrantLock lock = this.lock;
231 if (count == items.length)
243 * Inserts the specified element at the tail of this queue, waiting
244 * for space to become available if the queue is full.
246 * @throws InterruptedException {@inheritDoc}
247 * @throws NullPointerException {@inheritDoc}
249 public void put(E e) throws InterruptedException {
250 if (e == null) throw new NullPointerException();
251 final E[] items = this.items;
252 final ReentrantLock lock = this.lock;
253 lock.lockInterruptibly();
256 while (count == items.length)
258 } catch (InterruptedException ie) {
259 notFull.signal(); // propagate to non-interrupted thread
269 * Inserts the specified element at the tail of this queue, waiting
270 * up to the specified wait time for space to become available if
273 * @throws InterruptedException {@inheritDoc}
274 * @throws NullPointerException {@inheritDoc}
276 public boolean offer(E e, long timeout, TimeUnit unit)
277 throws InterruptedException {
279 if (e == null) throw new NullPointerException();
280 long nanos = unit.toNanos(timeout);
281 final ReentrantLock lock = this.lock;
282 lock.lockInterruptibly();
285 if (count != items.length) {
292 nanos = notFull.awaitNanos(nanos);
293 } catch (InterruptedException ie) {
294 notFull.signal(); // propagate to non-interrupted thread
304 final ReentrantLock lock = this.lock;
316 public E take() throws InterruptedException {
317 final ReentrantLock lock = this.lock;
318 lock.lockInterruptibly();
323 } catch (InterruptedException ie) {
324 notEmpty.signal(); // propagate to non-interrupted thread
334 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
335 long nanos = unit.toNanos(timeout);
336 final ReentrantLock lock = this.lock;
337 lock.lockInterruptibly();
347 nanos = notEmpty.awaitNanos(nanos);
348 } catch (InterruptedException ie) {
349 notEmpty.signal(); // propagate to non-interrupted thread
360 final ReentrantLock lock = this.lock;
363 return (count == 0) ? null : items[takeIndex];
369 // this doc comment is overridden to remove the reference to collections
370 // greater in size than Integer.MAX_VALUE
372 * Returns the number of elements in this queue.
374 * @return the number of elements in this queue
377 final ReentrantLock lock = this.lock;
386 // this doc comment is a modified copy of the inherited doc comment,
387 // without the reference to unlimited queues.
389 * Returns the number of additional elements that this queue can ideally
390 * (in the absence of memory or resource constraints) accept without
391 * blocking. This is always equal to the initial capacity of this queue
392 * less the current <tt>size</tt> of this queue.
394 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
395 * an element will succeed by inspecting <tt>remainingCapacity</tt>
396 * because it may be the case that another thread is about to
397 * insert or remove an element.
399 public int remainingCapacity() {
400 final ReentrantLock lock = this.lock;
403 return items.length - count;
410 * Removes a single instance of the specified element from this queue,
411 * if it is present. More formally, removes an element <tt>e</tt> such
412 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
414 * Returns <tt>true</tt> if this queue contained the specified element
415 * (or equivalently, if this queue changed as a result of the call).
417 * @param o element to be removed from this queue, if present
418 * @return <tt>true</tt> if this queue changed as a result of the call
420 public boolean remove(Object o) {
421 if (o == null) return false;
422 final E[] items = this.items;
423 final ReentrantLock lock = this.lock;
431 if (o.equals(items[i])) {
444 * Returns <tt>true</tt> if this queue contains the specified element.
445 * More formally, returns <tt>true</tt> if and only if this queue contains
446 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
448 * @param o object to be checked for containment in this queue
449 * @return <tt>true</tt> if this queue contains the specified element
451 public boolean contains(Object o) {
452 if (o == null) return false;
453 final E[] items = this.items;
454 final ReentrantLock lock = this.lock;
459 while (k++ < count) {
460 if (o.equals(items[i]))
471 * Returns an array containing all of the elements in this queue, in
474 * <p>The returned array will be "safe" in that no references to it are
475 * maintained by this queue. (In other words, this method must allocate
476 * a new array). The caller is thus free to modify the returned array.
478 * <p>This method acts as bridge between array-based and collection-based
481 * @return an array containing all of the elements in this queue
483 public Object[] toArray() {
484 final E[] items = this.items;
485 final ReentrantLock lock = this.lock;
488 Object[] a = new Object[count];
502 * Returns an array containing all of the elements in this queue, in
503 * proper sequence; the runtime type of the returned array is that of
504 * the specified array. If the queue fits in the specified array, it
505 * is returned therein. Otherwise, a new array is allocated with the
506 * runtime type of the specified array and the size of this queue.
508 * <p>If this queue fits in the specified array with room to spare
509 * (i.e., the array has more elements than this queue), the element in
510 * the array immediately following the end of the queue is set to
513 * <p>Like the {@link #toArray()} method, this method acts as bridge between
514 * array-based and collection-based APIs. Further, this method allows
515 * precise control over the runtime type of the output array, and may,
516 * under certain circumstances, be used to save allocation costs.
518 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
519 * The following code can be used to dump the queue into a newly
520 * allocated array of <tt>String</tt>:
523 * String[] y = x.toArray(new String[0]);</pre>
525 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
526 * <tt>toArray()</tt>.
528 * @param a the array into which the elements of the queue are to
529 * be stored, if it is big enough; otherwise, a new array of the
530 * same runtime type is allocated for this purpose
531 * @return an array containing all of the elements in this queue
532 * @throws ArrayStoreException if the runtime type of the specified array
533 * is not a supertype of the runtime type of every element in
535 * @throws NullPointerException if the specified array is null
537 public <T> T[] toArray(T[] a) {
538 final E[] items = this.items;
539 final ReentrantLock lock = this.lock;
542 if (a.length < count)
543 a = (T[])java.lang.reflect.Array.newInstance(
544 a.getClass().getComponentType(),
551 a[k++] = (T)items[i];
554 if (a.length > count)
562 public String toString() {
563 final ReentrantLock lock = this.lock;
566 return super.toString();
573 * Atomically removes all of the elements from this queue.
574 * The queue will be empty after this call returns.
576 public void clear() {
577 final E[] items = this.items;
578 final ReentrantLock lock = this.lock;
597 * @throws UnsupportedOperationException {@inheritDoc}
598 * @throws ClassCastException {@inheritDoc}
599 * @throws NullPointerException {@inheritDoc}
600 * @throws IllegalArgumentException {@inheritDoc}
602 public int drainTo(Collection<? super E> c) {
604 throw new NullPointerException();
606 throw new IllegalArgumentException();
607 final E[] items = this.items;
608 final ReentrantLock lock = this.lock;
633 * @throws UnsupportedOperationException {@inheritDoc}
634 * @throws ClassCastException {@inheritDoc}
635 * @throws NullPointerException {@inheritDoc}
636 * @throws IllegalArgumentException {@inheritDoc}
638 public int drainTo(Collection<? super E> c, int maxElements) {
640 throw new NullPointerException();
642 throw new IllegalArgumentException();
643 if (maxElements <= 0)
645 final E[] items = this.items;
646 final ReentrantLock lock = this.lock;
652 int max = (maxElements < count)? maxElements : count;
672 * Returns an iterator over the elements in this queue in proper sequence.
673 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
674 * will never throw {@link ConcurrentModificationException},
675 * and guarantees to traverse elements as they existed upon
676 * construction of the iterator, and may (but is not guaranteed to)
677 * reflect any modifications subsequent to construction.
679 * @return an iterator over the elements in this queue in proper sequence
681 public Iterator<E> iterator() {
682 final ReentrantLock lock = this.lock;
692 * Iterator for ArrayBlockingQueue
694 private class Itr implements Iterator<E> {
696 * Index of element to be returned by next,
697 * or a negative number if no such.
699 private int nextIndex;
702 * nextItem holds on to item fields because once we claim
703 * that an element exists in hasNext(), we must return it in
704 * the following next() call even if it was in the process of
705 * being removed when hasNext() was called.
710 * Index of element returned by most recent call to next.
711 * Reset to -1 if this element is deleted by a call to remove.
720 nextIndex = takeIndex;
721 nextItem = items[takeIndex];
725 public boolean hasNext() {
727 * No sync. We can return true by mistake here
728 * only if this iterator passed across threads,
729 * which we don't support anyway.
731 return nextIndex >= 0;
735 * Checks whether nextIndex is valid; if so setting nextItem.
736 * Stops iterator when either hits putIndex or sees null item.
738 private void checkNext() {
739 if (nextIndex == putIndex) {
743 nextItem = items[nextIndex];
744 if (nextItem == null)
750 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
754 throw new NoSuchElementException();
757 nextIndex = inc(nextIndex);
765 public void remove() {
766 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
771 throw new IllegalStateException();
776 // back up cursor (reset to front if was first element)
777 nextIndex = (i == ti) ? takeIndex : i;