2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
10 // removed link to collections framework docs
13 import java.util.AbstractQueue;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
22 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
24 * This queue orders elements FIFO (first-in-first-out).
25 * The <em>head</em> of the queue is that element that has been on the
26 * queue the longest time.
27 * The <em>tail</em> of the queue is that element that has been on the
28 * queue the shortest time. New elements
29 * are inserted at the tail of the queue, and the queue retrieval
30 * operations obtain elements at the head of the queue.
31 * Linked queues typically have higher throughput than array-based queues but
32 * less predictable performance in most concurrent applications.
34 * <p> The optional capacity bound constructor argument serves as a
35 * way to prevent excessive queue expansion. The capacity, if unspecified,
36 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
37 * dynamically created upon each insertion unless this would bring the
38 * queue above capacity.
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
46 * @param <E> the type of elements held in this collection
49 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
50 implements BlockingQueue<E>, java.io.Serializable {
51 private static final long serialVersionUID = -6903933977591709194L;
54 * A variant of the "two lock queue" algorithm. The putLock gates
55 * entry to put (and offer), and has an associated condition for
56 * waiting puts. Similarly for the takeLock. The "count" field
57 * that they both rely on is maintained as an atomic to avoid
58 * needing to get both locks in most cases. Also, to minimize need
59 * for puts to get takeLock and vice-versa, cascading notifies are
60 * used. When a put notices that it has enabled at least one take,
61 * it signals taker. That taker in turn signals others if more
62 * items have been entered since the signal. And symmetrically for
63 * takes signalling puts. Operations such as remove(Object) and
64 * iterators acquire both locks.
66 * Visibility between writers and readers is provided as follows:
68 * Whenever an element is enqueued, the putLock is acquired and
69 * count updated. A subsequent reader guarantees visibility to the
70 * enqueued Node by either acquiring the putLock (via fullyLock)
71 * or by acquiring the takeLock, and then reading n = count.get();
72 * this gives visibility to the first n items.
74 * To implement weakly consistent iterators, it appears we need to
75 * keep all Nodes GC-reachable from a predecessor dequeued Node.
76 * That would cause two problems:
77 * - allow a rogue Iterator to cause unbounded memory retention
78 * - cause cross-generational linking of old Nodes to new Nodes if
79 * a Node was tenured while live, which generational GCs have a
80 * hard time dealing with, causing repeated major collections.
81 * However, only non-deleted Nodes need to be reachable from
82 * dequeued Nodes, and reachability does not necessarily have to
83 * be of the kind understood by the GC. We use the trick of
84 * linking a Node that has just been dequeued to itself. Such a
85 * self-link implicitly means to advance to head.next.
89 * Linked list node class
91 static class Node<E> {
96 * - the real successor Node
97 * - this Node, meaning the successor is head.next
98 * - null, meaning there is no successor (this is the last node)
102 Node(E x) { item = x; }
105 /** The capacity bound, or Integer.MAX_VALUE if none */
106 private final int capacity;
108 /** Current number of elements */
109 private final AtomicInteger count = new AtomicInteger(0);
112 * Head of linked list.
113 * Invariant: head.item == null
115 private transient Node<E> head;
118 * Tail of linked list.
119 * Invariant: last.next == null
121 private transient Node<E> last;
123 /** Lock held by take, poll, etc */
124 private final ReentrantLock takeLock = new ReentrantLock();
126 /** Wait queue for waiting takes */
127 private final Condition notEmpty = takeLock.newCondition();
129 /** Lock held by put, offer, etc */
130 private final ReentrantLock putLock = new ReentrantLock();
132 /** Wait queue for waiting puts */
133 private final Condition notFull = putLock.newCondition();
136 * Signals a waiting take. Called only from put/offer (which do not
137 * otherwise ordinarily lock takeLock.)
139 private void signalNotEmpty() {
140 final ReentrantLock takeLock = this.takeLock;
150 * Signals a waiting put. Called only from take/poll.
152 private void signalNotFull() {
153 final ReentrantLock putLock = this.putLock;
163 * Creates a node and links it at end of queue.
167 private void enqueue(E x) {
168 // assert putLock.isHeldByCurrentThread();
169 // assert last.next == null;
170 last = last.next = new Node<E>(x);
174 * Removes a node from head of queue.
178 private E dequeue() {
179 // assert takeLock.isHeldByCurrentThread();
180 // assert head.item == null;
182 Node<E> first = h.next;
183 h.next = h; // help GC
191 * Lock to prevent both puts and takes.
199 * Unlock to allow both puts and takes.
207 // * Tells whether both locks are held by current thread.
209 // boolean isFullyLocked() {
210 // return (putLock.isHeldByCurrentThread() &&
211 // takeLock.isHeldByCurrentThread());
215 * Creates a {@code LinkedBlockingQueue} with a capacity of
216 * {@link Integer#MAX_VALUE}.
218 public LinkedBlockingQueue() {
219 this(Integer.MAX_VALUE);
223 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
225 * @param capacity the capacity of this queue
226 * @throws IllegalArgumentException if {@code capacity} is not greater
229 public LinkedBlockingQueue(int capacity) {
230 if (capacity <= 0) throw new IllegalArgumentException();
231 this.capacity = capacity;
232 last = head = new Node<E>(null);
236 * Creates a {@code LinkedBlockingQueue} with a capacity of
237 * {@link Integer#MAX_VALUE}, initially containing the elements of the
239 * added in traversal order of the collection's iterator.
241 * @param c the collection of elements to initially contain
242 * @throws NullPointerException if the specified collection or any
243 * of its elements are null
245 public LinkedBlockingQueue(Collection<? extends E> c) {
246 this(Integer.MAX_VALUE);
247 final ReentrantLock putLock = this.putLock;
248 putLock.lock(); // Never contended, but necessary for visibility
253 throw new NullPointerException();
255 throw new IllegalStateException("Queue full");
266 // this doc comment is overridden to remove the reference to collections
267 // greater in size than Integer.MAX_VALUE
269 * Returns the number of elements in this queue.
271 * @return the number of elements in this queue
277 // this doc comment is a modified copy of the inherited doc comment,
278 // without the reference to unlimited queues.
280 * Returns the number of additional elements that this queue can ideally
281 * (in the absence of memory or resource constraints) accept without
282 * blocking. This is always equal to the initial capacity of this queue
283 * less the current {@code size} of this queue.
285 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
286 * an element will succeed by inspecting {@code remainingCapacity}
287 * because it may be the case that another thread is about to
288 * insert or remove an element.
290 public int remainingCapacity() {
291 return capacity - count.get();
295 * Inserts the specified element at the tail of this queue, waiting if
296 * necessary for space to become available.
298 * @throws InterruptedException {@inheritDoc}
299 * @throws NullPointerException {@inheritDoc}
301 public void put(E e) throws InterruptedException {
302 if (e == null) throw new NullPointerException();
303 // Note: convention in all put/take/etc is to preset local var
304 // holding count negative to indicate failure unless set.
306 final ReentrantLock putLock = this.putLock;
307 final AtomicInteger count = this.count;
308 putLock.lockInterruptibly();
311 * Note that count is used in wait guard even though it is
312 * not protected by lock. This works because count can
313 * only decrease at this point (all other puts are shut
314 * out by lock), and we (or some other waiting put) are
315 * signalled if it ever changes from capacity. Similarly
316 * for all other uses of count in other wait guards.
318 while (count.get() == capacity) {
322 c = count.getAndIncrement();
323 if (c + 1 < capacity)
333 * Inserts the specified element at the tail of this queue, waiting if
334 * necessary up to the specified wait time for space to become available.
336 * @return {@code true} if successful, or {@code false} if
337 * the specified waiting time elapses before space is available.
338 * @throws InterruptedException {@inheritDoc}
339 * @throws NullPointerException {@inheritDoc}
341 public boolean offer(E e, long timeout, TimeUnit unit)
342 throws InterruptedException {
344 if (e == null) throw new NullPointerException();
345 long nanos = unit.toNanos(timeout);
347 final ReentrantLock putLock = this.putLock;
348 final AtomicInteger count = this.count;
349 putLock.lockInterruptibly();
351 while (count.get() == capacity) {
354 nanos = notFull.awaitNanos(nanos);
357 c = count.getAndIncrement();
358 if (c + 1 < capacity)
369 * Inserts the specified element at the tail of this queue if it is
370 * possible to do so immediately without exceeding the queue's capacity,
371 * returning {@code true} upon success and {@code false} if this queue
373 * When using a capacity-restricted queue, this method is generally
374 * preferable to method {@link BlockingQueue#add add}, which can fail to
375 * insert an element only by throwing an exception.
377 * @throws NullPointerException if the specified element is null
379 public boolean offer(E e) {
380 if (e == null) throw new NullPointerException();
381 final AtomicInteger count = this.count;
382 if (count.get() == capacity)
385 final ReentrantLock putLock = this.putLock;
388 if (count.get() < capacity) {
390 c = count.getAndIncrement();
391 if (c + 1 < capacity)
403 public E take() throws InterruptedException {
406 final AtomicInteger count = this.count;
407 final ReentrantLock takeLock = this.takeLock;
408 takeLock.lockInterruptibly();
410 while (count.get() == 0) {
414 c = count.getAndDecrement();
425 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428 long nanos = unit.toNanos(timeout);
429 final AtomicInteger count = this.count;
430 final ReentrantLock takeLock = this.takeLock;
431 takeLock.lockInterruptibly();
433 while (count.get() == 0) {
436 nanos = notEmpty.awaitNanos(nanos);
439 c = count.getAndDecrement();
451 final AtomicInteger count = this.count;
452 if (count.get() == 0)
456 final ReentrantLock takeLock = this.takeLock;
459 if (count.get() > 0) {
461 c = count.getAndDecrement();
474 if (count.get() == 0)
476 final ReentrantLock takeLock = this.takeLock;
479 Node<E> first = head.next;
490 * Unlinks interior Node p with predecessor trail.
492 void unlink(Node<E> p, Node<E> trail) {
493 // assert isFullyLocked();
494 // p.next is not changed, to allow iterators that are
495 // traversing p to maintain their weak-consistency guarantee.
500 if (count.getAndDecrement() == capacity)
505 * Removes a single instance of the specified element from this queue,
506 * if it is present. More formally, removes an element {@code e} such
507 * that {@code o.equals(e)}, if this queue contains one or more such
509 * Returns {@code true} if this queue contained the specified element
510 * (or equivalently, if this queue changed as a result of the call).
512 * @param o element to be removed from this queue, if present
513 * @return {@code true} if this queue changed as a result of the call
515 public boolean remove(Object o) {
516 if (o == null) return false;
519 for (Node<E> trail = head, p = trail.next;
521 trail = p, p = p.next) {
522 if (o.equals(p.item)) {
534 * Returns an array containing all of the elements in this queue, in
537 * <p>The returned array will be "safe" in that no references to it are
538 * maintained by this queue. (In other words, this method must allocate
539 * a new array). The caller is thus free to modify the returned array.
541 * <p>This method acts as bridge between array-based and collection-based
544 * @return an array containing all of the elements in this queue
546 public Object[] toArray() {
549 int size = count.get();
550 Object[] a = new Object[size];
552 for (Node<E> p = head.next; p != null; p = p.next)
561 * Returns an array containing all of the elements in this queue, in
562 * proper sequence; the runtime type of the returned array is that of
563 * the specified array. If the queue fits in the specified array, it
564 * is returned therein. Otherwise, a new array is allocated with the
565 * runtime type of the specified array and the size of this queue.
567 * <p>If this queue fits in the specified array with room to spare
568 * (i.e., the array has more elements than this queue), the element in
569 * the array immediately following the end of the queue is set to
572 * <p>Like the {@link #toArray()} method, this method acts as bridge between
573 * array-based and collection-based APIs. Further, this method allows
574 * precise control over the runtime type of the output array, and may,
575 * under certain circumstances, be used to save allocation costs.
577 * <p>Suppose {@code x} is a queue known to contain only strings.
578 * The following code can be used to dump the queue into a newly
579 * allocated array of {@code String}:
582 * String[] y = x.toArray(new String[0]);</pre>
584 * Note that {@code toArray(new Object[0])} is identical in function to
587 * @param a the array into which the elements of the queue are to
588 * be stored, if it is big enough; otherwise, a new array of the
589 * same runtime type is allocated for this purpose
590 * @return an array containing all of the elements in this queue
591 * @throws ArrayStoreException if the runtime type of the specified array
592 * is not a supertype of the runtime type of every element in
594 * @throws NullPointerException if the specified array is null
596 @SuppressWarnings("unchecked")
597 public <T> T[] toArray(T[] a) {
600 int size = count.get();
602 a = (T[])java.lang.reflect.Array.newInstance
603 (a.getClass().getComponentType(), size);
606 for (Node<E> p = head.next; p != null; p = p.next)
616 public String toString() {
619 return super.toString();
626 * Atomically removes all of the elements from this queue.
627 * The queue will be empty after this call returns.
629 public void clear() {
632 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
637 // assert head.item == null && head.next == null;
638 if (count.getAndSet(0) == capacity)
646 * @throws UnsupportedOperationException {@inheritDoc}
647 * @throws ClassCastException {@inheritDoc}
648 * @throws NullPointerException {@inheritDoc}
649 * @throws IllegalArgumentException {@inheritDoc}
651 public int drainTo(Collection<? super E> c) {
652 return drainTo(c, Integer.MAX_VALUE);
656 * @throws UnsupportedOperationException {@inheritDoc}
657 * @throws ClassCastException {@inheritDoc}
658 * @throws NullPointerException {@inheritDoc}
659 * @throws IllegalArgumentException {@inheritDoc}
661 public int drainTo(Collection<? super E> c, int maxElements) {
663 throw new NullPointerException();
665 throw new IllegalArgumentException();
666 boolean signalNotFull = false;
667 final ReentrantLock takeLock = this.takeLock;
670 int n = Math.min(maxElements, count.get());
671 // count.get provides visibility to first n Nodes
685 // Restore invariants even if c.add() threw
687 // assert h.item == null;
689 signalNotFull = (count.getAndAdd(-i) == capacity);
700 * Returns an iterator over the elements in this queue in proper sequence.
701 * The returned {@code Iterator} is a "weakly consistent" iterator that
702 * will never throw {@link java.util.ConcurrentModificationException
703 * ConcurrentModificationException},
704 * and guarantees to traverse elements as they existed upon
705 * construction of the iterator, and may (but is not guaranteed to)
706 * reflect any modifications subsequent to construction.
708 * @return an iterator over the elements in this queue in proper sequence
710 public Iterator<E> iterator() {
714 private class Itr implements Iterator<E> {
716 * Basic weakly-consistent iterator. At all times hold the next
717 * item to hand out so that if hasNext() reports true, we will
718 * still have it to return even if lost race with a take etc.
720 private Node<E> current;
721 private Node<E> lastRet;
722 private E currentElement;
729 currentElement = current.item;
735 public boolean hasNext() {
736 return current != null;
740 * Returns the next live successor of p, or null if no such.
742 * Unlike other traversal methods, iterators need to handle both:
743 * - dequeued nodes (p.next == p)
744 * - (possibly multiple) interior removed nodes (p.item == null)
746 private Node<E> nextNode(Node<E> p) {
751 if (s == null || s.item != null)
761 throw new NoSuchElementException();
762 E x = currentElement;
764 current = nextNode(current);
765 currentElement = (current == null) ? null : current.item;
772 public void remove() {
774 throw new IllegalStateException();
777 Node<E> node = lastRet;
779 for (Node<E> trail = head, p = trail.next;
781 trail = p, p = p.next) {
794 * Save the state to a stream (that is, serialize it).
796 * @serialData The capacity is emitted (int), followed by all of
797 * its elements (each an {@code Object}) in the proper order,
799 * @param s the stream
801 private void writeObject(java.io.ObjectOutputStream s)
802 throws java.io.IOException {
806 // Write out any hidden stuff, plus capacity
807 s.defaultWriteObject();
809 // Write out all elements in the proper order.
810 for (Node<E> p = head.next; p != null; p = p.next)
811 s.writeObject(p.item);
813 // Use trailing null as sentinel
821 * Reconstitute this queue instance from a stream (that is,
824 * @param s the stream
826 private void readObject(java.io.ObjectInputStream s)
827 throws java.io.IOException, ClassNotFoundException {
828 // Read in capacity, and any hidden stuff
829 s.defaultReadObject();
832 last = head = new Node<E>(null);
834 // Read in all elements and place in queue
836 @SuppressWarnings("unchecked")
837 E item = (E)s.readObject();