2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
31 * <p>This class and its iterator implement all of the
32 * <em>optional</em> methods of the {@link Collection} and {@link
33 * Iterator} interfaces.
35 * <p>This class is a member of the
36 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
37 * Java Collections Framework</a>.
41 * @param <E> the type of elements held in this collection
44 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 implements BlockingQueue<E>, java.io.Serializable {
46 private static final long serialVersionUID = -6903933977591709194L;
49 * A variant of the "two lock queue" algorithm. The putLock gates
50 * entry to put (and offer), and has an associated condition for
51 * waiting puts. Similarly for the takeLock. The "count" field
52 * that they both rely on is maintained as an atomic to avoid
53 * needing to get both locks in most cases. Also, to minimize need
54 * for puts to get takeLock and vice-versa, cascading notifies are
55 * used. When a put notices that it has enabled at least one take,
56 * it signals taker. That taker in turn signals others if more
57 * items have been entered since the signal. And symmetrically for
58 * takes signalling puts. Operations such as remove(Object) and
59 * iterators acquire both locks.
63 * Linked list node class
65 static class Node<E> {
66 /** The item, volatile to ensure barrier separating write and read */
69 Node(E x) { item = x; }
72 /** The capacity bound, or Integer.MAX_VALUE if none */
73 private final int capacity;
75 /** Current number of elements */
76 private final AtomicInteger count = new AtomicInteger(0);
78 /** Head of linked list */
79 private transient Node<E> head;
81 /** Tail of linked list */
82 private transient Node<E> last;
84 /** Lock held by take, poll, etc */
85 private final ReentrantLock takeLock = new ReentrantLock();
87 /** Wait queue for waiting takes */
88 private final Condition notEmpty = takeLock.newCondition();
90 /** Lock held by put, offer, etc */
91 private final ReentrantLock putLock = new ReentrantLock();
93 /** Wait queue for waiting puts */
94 private final Condition notFull = putLock.newCondition();
97 * Signals a waiting take. Called only from put/offer (which do not
98 * otherwise ordinarily lock takeLock.)
100 private void signalNotEmpty() {
101 final ReentrantLock takeLock = this.takeLock;
111 * Signals a waiting put. Called only from take/poll.
113 private void signalNotFull() {
114 final ReentrantLock putLock = this.putLock;
124 * Creates a node and links it at end of queue.
127 private void insert(E x) {
128 last = last.next = new Node<E>(x);
132 * Removes a node from head of queue,
135 private E extract() {
136 Node<E> first = head.next;
144 * Lock to prevent both puts and takes.
146 private void fullyLock() {
152 * Unlock to allow both puts and takes.
154 private void fullyUnlock() {
161 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 * {@link Integer#MAX_VALUE}.
164 public LinkedBlockingQueue() {
165 this(Integer.MAX_VALUE);
169 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
171 * @param capacity the capacity of this queue
172 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
175 public LinkedBlockingQueue(int capacity) {
176 if (capacity <= 0) throw new IllegalArgumentException();
177 this.capacity = capacity;
178 last = head = new Node<E>(null);
182 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 * {@link Integer#MAX_VALUE}, initially containing the elements of the
185 * added in traversal order of the collection's iterator.
187 * @param c the collection of elements to initially contain
188 * @throws NullPointerException if the specified collection or any
189 * of its elements are null
191 public LinkedBlockingQueue(Collection<? extends E> c) {
192 this(Integer.MAX_VALUE);
198 // this doc comment is overridden to remove the reference to collections
199 // greater in size than Integer.MAX_VALUE
201 * Returns the number of elements in this queue.
203 * @return the number of elements in this queue
209 // this doc comment is a modified copy of the inherited doc comment,
210 // without the reference to unlimited queues.
212 * Returns the number of additional elements that this queue can ideally
213 * (in the absence of memory or resource constraints) accept without
214 * blocking. This is always equal to the initial capacity of this queue
215 * less the current <tt>size</tt> of this queue.
217 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
218 * an element will succeed by inspecting <tt>remainingCapacity</tt>
219 * because it may be the case that another thread is about to
220 * insert or remove an element.
222 public int remainingCapacity() {
223 return capacity - count.get();
227 * Inserts the specified element at the tail of this queue, waiting if
228 * necessary for space to become available.
230 * @throws InterruptedException {@inheritDoc}
231 * @throws NullPointerException {@inheritDoc}
233 public void put(E e) throws InterruptedException {
234 if (e == null) throw new NullPointerException();
235 // Note: convention in all put/take/etc is to preset
236 // local var holding count negative to indicate failure unless set.
238 final ReentrantLock putLock = this.putLock;
239 final AtomicInteger count = this.count;
240 putLock.lockInterruptibly();
243 * Note that count is used in wait guard even though it is
244 * not protected by lock. This works because count can
245 * only decrease at this point (all other puts are shut
246 * out by lock), and we (or some other waiting put) are
247 * signalled if it ever changes from
248 * capacity. Similarly for all other uses of count in
252 while (count.get() == capacity)
254 } catch (InterruptedException ie) {
255 notFull.signal(); // propagate to a non-interrupted thread
259 c = count.getAndIncrement();
260 if (c + 1 < capacity)
270 * Inserts the specified element at the tail of this queue, waiting if
271 * necessary up to the specified wait time for space to become available.
273 * @return <tt>true</tt> if successful, or <tt>false</tt> if
274 * the specified waiting time elapses before space is available.
275 * @throws InterruptedException {@inheritDoc}
276 * @throws NullPointerException {@inheritDoc}
278 public boolean offer(E e, long timeout, TimeUnit unit)
279 throws InterruptedException {
281 if (e == null) throw new NullPointerException();
282 long nanos = unit.toNanos(timeout);
284 final ReentrantLock putLock = this.putLock;
285 final AtomicInteger count = this.count;
286 putLock.lockInterruptibly();
289 if (count.get() < capacity) {
291 c = count.getAndIncrement();
292 if (c + 1 < capacity)
299 nanos = notFull.awaitNanos(nanos);
300 } catch (InterruptedException ie) {
301 notFull.signal(); // propagate to a non-interrupted thread
314 * Inserts the specified element at the tail of this queue if it is
315 * possible to do so immediately without exceeding the queue's capacity,
316 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
318 * When using a capacity-restricted queue, this method is generally
319 * preferable to method {@link BlockingQueue#add add}, which can fail to
320 * insert an element only by throwing an exception.
322 * @throws NullPointerException if the specified element is null
324 public boolean offer(E e) {
325 if (e == null) throw new NullPointerException();
326 final AtomicInteger count = this.count;
327 if (count.get() == capacity)
330 final ReentrantLock putLock = this.putLock;
333 if (count.get() < capacity) {
335 c = count.getAndIncrement();
336 if (c + 1 < capacity)
348 public E take() throws InterruptedException {
351 final AtomicInteger count = this.count;
352 final ReentrantLock takeLock = this.takeLock;
353 takeLock.lockInterruptibly();
356 while (count.get() == 0)
358 } catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to a non-interrupted thread
364 c = count.getAndDecrement();
375 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
378 long nanos = unit.toNanos(timeout);
379 final AtomicInteger count = this.count;
380 final ReentrantLock takeLock = this.takeLock;
381 takeLock.lockInterruptibly();
384 if (count.get() > 0) {
386 c = count.getAndDecrement();
394 nanos = notEmpty.awaitNanos(nanos);
395 } catch (InterruptedException ie) {
396 notEmpty.signal(); // propagate to a non-interrupted thread
409 final AtomicInteger count = this.count;
410 if (count.get() == 0)
414 final ReentrantLock takeLock = this.takeLock;
417 if (count.get() > 0) {
419 c = count.getAndDecrement();
433 if (count.get() == 0)
435 final ReentrantLock takeLock = this.takeLock;
438 Node<E> first = head.next;
449 * Removes a single instance of the specified element from this queue,
450 * if it is present. More formally, removes an element <tt>e</tt> such
451 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
453 * Returns <tt>true</tt> if this queue contained the specified element
454 * (or equivalently, if this queue changed as a result of the call).
456 * @param o element to be removed from this queue, if present
457 * @return <tt>true</tt> if this queue changed as a result of the call
459 public boolean remove(Object o) {
460 if (o == null) return false;
461 boolean removed = false;
464 Node<E> trail = head;
465 Node<E> p = head.next;
467 if (o.equals(p.item)) {
479 if (count.getAndDecrement() == capacity)
489 * Returns an array containing all of the elements in this queue, in
492 * <p>The returned array will be "safe" in that no references to it are
493 * maintained by this queue. (In other words, this method must allocate
494 * a new array). The caller is thus free to modify the returned array.
496 * <p>This method acts as bridge between array-based and collection-based
499 * @return an array containing all of the elements in this queue
501 public Object[] toArray() {
504 int size = count.get();
505 Object[] a = new Object[size];
507 for (Node<E> p = head.next; p != null; p = p.next)
516 * Returns an array containing all of the elements in this queue, in
517 * proper sequence; the runtime type of the returned array is that of
518 * the specified array. If the queue fits in the specified array, it
519 * is returned therein. Otherwise, a new array is allocated with the
520 * runtime type of the specified array and the size of this queue.
522 * <p>If this queue fits in the specified array with room to spare
523 * (i.e., the array has more elements than this queue), the element in
524 * the array immediately following the end of the queue is set to
527 * <p>Like the {@link #toArray()} method, this method acts as bridge between
528 * array-based and collection-based APIs. Further, this method allows
529 * precise control over the runtime type of the output array, and may,
530 * under certain circumstances, be used to save allocation costs.
532 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
533 * The following code can be used to dump the queue into a newly
534 * allocated array of <tt>String</tt>:
537 * String[] y = x.toArray(new String[0]);</pre>
539 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
540 * <tt>toArray()</tt>.
542 * @param a the array into which the elements of the queue are to
543 * be stored, if it is big enough; otherwise, a new array of the
544 * same runtime type is allocated for this purpose
545 * @return an array containing all of the elements in this queue
546 * @throws ArrayStoreException if the runtime type of the specified array
547 * is not a supertype of the runtime type of every element in
549 * @throws NullPointerException if the specified array is null
551 public <T> T[] toArray(T[] a) {
554 int size = count.get();
556 a = (T[])java.lang.reflect.Array.newInstance
557 (a.getClass().getComponentType(), size);
560 for (Node p = head.next; p != null; p = p.next)
570 public String toString() {
573 return super.toString();
580 * Atomically removes all of the elements from this queue.
581 * The queue will be empty after this call returns.
583 public void clear() {
587 assert head.item == null;
589 if (count.getAndSet(0) == capacity)
597 * @throws UnsupportedOperationException {@inheritDoc}
598 * @throws ClassCastException {@inheritDoc}
599 * @throws NullPointerException {@inheritDoc}
600 * @throws IllegalArgumentException {@inheritDoc}
602 public int drainTo(Collection<? super E> c) {
604 throw new NullPointerException();
606 throw new IllegalArgumentException();
612 assert head.item == null;
614 if (count.getAndSet(0) == capacity)
619 // Transfer the elements outside of locks
621 for (Node<E> p = first; p != null; p = p.next) {
630 * @throws UnsupportedOperationException {@inheritDoc}
631 * @throws ClassCastException {@inheritDoc}
632 * @throws NullPointerException {@inheritDoc}
633 * @throws IllegalArgumentException {@inheritDoc}
635 public int drainTo(Collection<? super E> c, int maxElements) {
637 throw new NullPointerException();
639 throw new IllegalArgumentException();
643 Node<E> p = head.next;
644 while (p != null && n < maxElements) {
652 assert head.item == null;
655 if (count.getAndAdd(-n) == capacity)
665 * Returns an iterator over the elements in this queue in proper sequence.
666 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
667 * will never throw {@link ConcurrentModificationException},
668 * and guarantees to traverse elements as they existed upon
669 * construction of the iterator, and may (but is not guaranteed to)
670 * reflect any modifications subsequent to construction.
672 * @return an iterator over the elements in this queue in proper sequence
674 public Iterator<E> iterator() {
678 private class Itr implements Iterator<E> {
680 * Basic weak-consistent iterator. At all times hold the next
681 * item to hand out so that if hasNext() reports true, we will
682 * still have it to return even if lost race with a take etc.
684 private Node<E> current;
685 private Node<E> lastRet;
686 private E currentElement;
689 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
690 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
696 currentElement = current.item;
703 public boolean hasNext() {
704 return current != null;
708 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
709 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
714 throw new NoSuchElementException();
715 E x = currentElement;
717 current = current.next;
719 currentElement = current.item;
727 public void remove() {
729 throw new IllegalStateException();
730 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
731 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
735 Node<E> node = lastRet;
737 Node<E> trail = head;
738 Node<E> p = head.next;
739 while (p != null && p != node) {
748 int c = count.getAndDecrement();
760 * Save the state to a stream (that is, serialize it).
762 * @serialData The capacity is emitted (int), followed by all of
763 * its elements (each an <tt>Object</tt>) in the proper order,
765 * @param s the stream
767 private void writeObject(java.io.ObjectOutputStream s)
768 throws java.io.IOException {
772 // Write out any hidden stuff, plus capacity
773 s.defaultWriteObject();
775 // Write out all elements in the proper order.
776 for (Node<E> p = head.next; p != null; p = p.next)
777 s.writeObject(p.item);
779 // Use trailing null as sentinel
787 * Reconstitute this queue instance from a stream (that is,
789 * @param s the stream
791 private void readObject(java.io.ObjectInputStream s)
792 throws java.io.IOException, ClassNotFoundException {
793 // Read in capacity, and any hidden stuff
794 s.defaultReadObject();
797 last = head = new Node<E>(null);
799 // Read in all elements and place in queue
801 E item = (E)s.readObject();