OSDN Git Service

2013.10.24
[uclinux-h8/uClinux-dist.git] / lib / classpath / external / jsr166 / java / util / concurrent / LinkedBlockingQueue.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14  * linked nodes.
15  * This queue orders elements FIFO (first-in-first-out).
16  * The <em>head</em> of the queue is that element that has been on the
17  * queue the longest time.
18  * The <em>tail</em> of the queue is that element that has been on the
19  * queue the shortest time. New elements
20  * are inserted at the tail of the queue, and the queue retrieval
21  * operations obtain elements at the head of the queue.
22  * Linked queues typically have higher throughput than array-based queues but
23  * less predictable performance in most concurrent applications.
24  *
25  * <p> The optional capacity bound constructor argument serves as a
26  * way to prevent excessive queue expansion. The capacity, if unspecified,
27  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
28  * dynamically created upon each insertion unless this would bring the
29  * queue above capacity.
30  *
31  * <p>This class and its iterator implement all of the
32  * <em>optional</em> methods of the {@link Collection} and {@link
33  * Iterator} interfaces.
34  *
35  * <p>This class is a member of the
36  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
37  * Java Collections Framework</a>.
38  *
39  * @since 1.5
40  * @author Doug Lea
41  * @param <E> the type of elements held in this collection
42  *
43  */
44 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45         implements BlockingQueue<E>, java.io.Serializable {
46     private static final long serialVersionUID = -6903933977591709194L;
47
48     /*
49      * A variant of the "two lock queue" algorithm.  The putLock gates
50      * entry to put (and offer), and has an associated condition for
51      * waiting puts.  Similarly for the takeLock.  The "count" field
52      * that they both rely on is maintained as an atomic to avoid
53      * needing to get both locks in most cases. Also, to minimize need
54      * for puts to get takeLock and vice-versa, cascading notifies are
55      * used. When a put notices that it has enabled at least one take,
56      * it signals taker. That taker in turn signals others if more
57      * items have been entered since the signal. And symmetrically for
58      * takes signalling puts. Operations such as remove(Object) and
59      * iterators acquire both locks.
60      */
61
62     /**
63      * Linked list node class
64      */
65     static class Node<E> {
66         /** The item, volatile to ensure barrier separating write and read */
67         volatile E item;
68         Node<E> next;
69         Node(E x) { item = x; }
70     }
71
72     /** The capacity bound, or Integer.MAX_VALUE if none */
73     private final int capacity;
74
75     /** Current number of elements */
76     private final AtomicInteger count = new AtomicInteger(0);
77
78     /** Head of linked list */
79     private transient Node<E> head;
80
81     /** Tail of linked list */
82     private transient Node<E> last;
83
84     /** Lock held by take, poll, etc */
85     private final ReentrantLock takeLock = new ReentrantLock();
86
87     /** Wait queue for waiting takes */
88     private final Condition notEmpty = takeLock.newCondition();
89
90     /** Lock held by put, offer, etc */
91     private final ReentrantLock putLock = new ReentrantLock();
92
93     /** Wait queue for waiting puts */
94     private final Condition notFull = putLock.newCondition();
95
96     /**
97      * Signals a waiting take. Called only from put/offer (which do not
98      * otherwise ordinarily lock takeLock.)
99      */
100     private void signalNotEmpty() {
101         final ReentrantLock takeLock = this.takeLock;
102         takeLock.lock();
103         try {
104             notEmpty.signal();
105         } finally {
106             takeLock.unlock();
107         }
108     }
109
110     /**
111      * Signals a waiting put. Called only from take/poll.
112      */
113     private void signalNotFull() {
114         final ReentrantLock putLock = this.putLock;
115         putLock.lock();
116         try {
117             notFull.signal();
118         } finally {
119             putLock.unlock();
120         }
121     }
122
123     /**
124      * Creates a node and links it at end of queue.
125      * @param x the item
126      */
127     private void insert(E x) {
128         last = last.next = new Node<E>(x);
129     }
130
131     /**
132      * Removes a node from head of queue,
133      * @return the node
134      */
135     private E extract() {
136         Node<E> first = head.next;
137         head = first;
138         E x = first.item;
139         first.item = null;
140         return x;
141     }
142
143     /**
144      * Lock to prevent both puts and takes.
145      */
146     private void fullyLock() {
147         putLock.lock();
148         takeLock.lock();
149     }
150
151     /**
152      * Unlock to allow both puts and takes.
153      */
154     private void fullyUnlock() {
155         takeLock.unlock();
156         putLock.unlock();
157     }
158
159
160     /**
161      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162      * {@link Integer#MAX_VALUE}.
163      */
164     public LinkedBlockingQueue() {
165         this(Integer.MAX_VALUE);
166     }
167
168     /**
169      * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170      *
171      * @param capacity the capacity of this queue
172      * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173      *         than zero
174      */
175     public LinkedBlockingQueue(int capacity) {
176         if (capacity <= 0) throw new IllegalArgumentException();
177         this.capacity = capacity;
178         last = head = new Node<E>(null);
179     }
180
181     /**
182      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183      * {@link Integer#MAX_VALUE}, initially containing the elements of the
184      * given collection,
185      * added in traversal order of the collection's iterator.
186      *
187      * @param c the collection of elements to initially contain
188      * @throws NullPointerException if the specified collection or any
189      *         of its elements are null
190      */
191     public LinkedBlockingQueue(Collection<? extends E> c) {
192         this(Integer.MAX_VALUE);
193         for (E e : c)
194             add(e);
195     }
196
197
198     // this doc comment is overridden to remove the reference to collections
199     // greater in size than Integer.MAX_VALUE
200     /**
201      * Returns the number of elements in this queue.
202      *
203      * @return the number of elements in this queue
204      */
205     public int size() {
206         return count.get();
207     }
208
209     // this doc comment is a modified copy of the inherited doc comment,
210     // without the reference to unlimited queues.
211     /**
212      * Returns the number of additional elements that this queue can ideally
213      * (in the absence of memory or resource constraints) accept without
214      * blocking. This is always equal to the initial capacity of this queue
215      * less the current <tt>size</tt> of this queue.
216      *
217      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
218      * an element will succeed by inspecting <tt>remainingCapacity</tt>
219      * because it may be the case that another thread is about to
220      * insert or remove an element.
221      */
222     public int remainingCapacity() {
223         return capacity - count.get();
224     }
225
226     /**
227      * Inserts the specified element at the tail of this queue, waiting if
228      * necessary for space to become available.
229      *
230      * @throws InterruptedException {@inheritDoc}
231      * @throws NullPointerException {@inheritDoc}
232      */
233     public void put(E e) throws InterruptedException {
234         if (e == null) throw new NullPointerException();
235         // Note: convention in all put/take/etc is to preset
236         // local var holding count  negative to indicate failure unless set.
237         int c = -1;
238         final ReentrantLock putLock = this.putLock;
239         final AtomicInteger count = this.count;
240         putLock.lockInterruptibly();
241         try {
242             /*
243              * Note that count is used in wait guard even though it is
244              * not protected by lock. This works because count can
245              * only decrease at this point (all other puts are shut
246              * out by lock), and we (or some other waiting put) are
247              * signalled if it ever changes from
248              * capacity. Similarly for all other uses of count in
249              * other wait guards.
250              */
251             try {
252                 while (count.get() == capacity)
253                     notFull.await();
254             } catch (InterruptedException ie) {
255                 notFull.signal(); // propagate to a non-interrupted thread
256                 throw ie;
257             }
258             insert(e);
259             c = count.getAndIncrement();
260             if (c + 1 < capacity)
261                 notFull.signal();
262         } finally {
263             putLock.unlock();
264         }
265         if (c == 0)
266             signalNotEmpty();
267     }
268
269     /**
270      * Inserts the specified element at the tail of this queue, waiting if
271      * necessary up to the specified wait time for space to become available.
272      *
273      * @return <tt>true</tt> if successful, or <tt>false</tt> if
274      *         the specified waiting time elapses before space is available.
275      * @throws InterruptedException {@inheritDoc}
276      * @throws NullPointerException {@inheritDoc}
277      */
278     public boolean offer(E e, long timeout, TimeUnit unit)
279         throws InterruptedException {
280
281         if (e == null) throw new NullPointerException();
282         long nanos = unit.toNanos(timeout);
283         int c = -1;
284         final ReentrantLock putLock = this.putLock;
285         final AtomicInteger count = this.count;
286         putLock.lockInterruptibly();
287         try {
288             for (;;) {
289                 if (count.get() < capacity) {
290                     insert(e);
291                     c = count.getAndIncrement();
292                     if (c + 1 < capacity)
293                         notFull.signal();
294                     break;
295                 }
296                 if (nanos <= 0)
297                     return false;
298                 try {
299                     nanos = notFull.awaitNanos(nanos);
300                 } catch (InterruptedException ie) {
301                     notFull.signal(); // propagate to a non-interrupted thread
302                     throw ie;
303                 }
304             }
305         } finally {
306             putLock.unlock();
307         }
308         if (c == 0)
309             signalNotEmpty();
310         return true;
311     }
312
313     /**
314      * Inserts the specified element at the tail of this queue if it is
315      * possible to do so immediately without exceeding the queue's capacity,
316      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
317      * is full.
318      * When using a capacity-restricted queue, this method is generally
319      * preferable to method {@link BlockingQueue#add add}, which can fail to
320      * insert an element only by throwing an exception.
321      *
322      * @throws NullPointerException if the specified element is null
323      */
324     public boolean offer(E e) {
325         if (e == null) throw new NullPointerException();
326         final AtomicInteger count = this.count;
327         if (count.get() == capacity)
328             return false;
329         int c = -1;
330         final ReentrantLock putLock = this.putLock;
331         putLock.lock();
332         try {
333             if (count.get() < capacity) {
334                 insert(e);
335                 c = count.getAndIncrement();
336                 if (c + 1 < capacity)
337                     notFull.signal();
338             }
339         } finally {
340             putLock.unlock();
341         }
342         if (c == 0)
343             signalNotEmpty();
344         return c >= 0;
345     }
346
347
348     public E take() throws InterruptedException {
349         E x;
350         int c = -1;
351         final AtomicInteger count = this.count;
352         final ReentrantLock takeLock = this.takeLock;
353         takeLock.lockInterruptibly();
354         try {
355             try {
356                 while (count.get() == 0)
357                     notEmpty.await();
358             } catch (InterruptedException ie) {
359                 notEmpty.signal(); // propagate to a non-interrupted thread
360                 throw ie;
361             }
362
363             x = extract();
364             c = count.getAndDecrement();
365             if (c > 1)
366                 notEmpty.signal();
367         } finally {
368             takeLock.unlock();
369         }
370         if (c == capacity)
371             signalNotFull();
372         return x;
373     }
374
375     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376         E x = null;
377         int c = -1;
378         long nanos = unit.toNanos(timeout);
379         final AtomicInteger count = this.count;
380         final ReentrantLock takeLock = this.takeLock;
381         takeLock.lockInterruptibly();
382         try {
383             for (;;) {
384                 if (count.get() > 0) {
385                     x = extract();
386                     c = count.getAndDecrement();
387                     if (c > 1)
388                         notEmpty.signal();
389                     break;
390                 }
391                 if (nanos <= 0)
392                     return null;
393                 try {
394                     nanos = notEmpty.awaitNanos(nanos);
395                 } catch (InterruptedException ie) {
396                     notEmpty.signal(); // propagate to a non-interrupted thread
397                     throw ie;
398                 }
399             }
400         } finally {
401             takeLock.unlock();
402         }
403         if (c == capacity)
404             signalNotFull();
405         return x;
406     }
407
408     public E poll() {
409         final AtomicInteger count = this.count;
410         if (count.get() == 0)
411             return null;
412         E x = null;
413         int c = -1;
414         final ReentrantLock takeLock = this.takeLock;
415         takeLock.lock();
416         try {
417             if (count.get() > 0) {
418                 x = extract();
419                 c = count.getAndDecrement();
420                 if (c > 1)
421                     notEmpty.signal();
422             }
423         } finally {
424             takeLock.unlock();
425         }
426         if (c == capacity)
427             signalNotFull();
428         return x;
429     }
430
431
432     public E peek() {
433         if (count.get() == 0)
434             return null;
435         final ReentrantLock takeLock = this.takeLock;
436         takeLock.lock();
437         try {
438             Node<E> first = head.next;
439             if (first == null)
440                 return null;
441             else
442                 return first.item;
443         } finally {
444             takeLock.unlock();
445         }
446     }
447
448     /**
449      * Removes a single instance of the specified element from this queue,
450      * if it is present.  More formally, removes an element <tt>e</tt> such
451      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
452      * elements.
453      * Returns <tt>true</tt> if this queue contained the specified element
454      * (or equivalently, if this queue changed as a result of the call).
455      *
456      * @param o element to be removed from this queue, if present
457      * @return <tt>true</tt> if this queue changed as a result of the call
458      */
459     public boolean remove(Object o) {
460         if (o == null) return false;
461         boolean removed = false;
462         fullyLock();
463         try {
464             Node<E> trail = head;
465             Node<E> p = head.next;
466             while (p != null) {
467                 if (o.equals(p.item)) {
468                     removed = true;
469                     break;
470                 }
471                 trail = p;
472                 p = p.next;
473             }
474             if (removed) {
475                 p.item = null;
476                 trail.next = p.next;
477                 if (last == p)
478                     last = trail;
479                 if (count.getAndDecrement() == capacity)
480                     notFull.signalAll();
481             }
482         } finally {
483             fullyUnlock();
484         }
485         return removed;
486     }
487
488     /**
489      * Returns an array containing all of the elements in this queue, in
490      * proper sequence.
491      *
492      * <p>The returned array will be "safe" in that no references to it are
493      * maintained by this queue.  (In other words, this method must allocate
494      * a new array).  The caller is thus free to modify the returned array.
495      *
496      * <p>This method acts as bridge between array-based and collection-based
497      * APIs.
498      *
499      * @return an array containing all of the elements in this queue
500      */
501     public Object[] toArray() {
502         fullyLock();
503         try {
504             int size = count.get();
505             Object[] a = new Object[size];
506             int k = 0;
507             for (Node<E> p = head.next; p != null; p = p.next)
508                 a[k++] = p.item;
509             return a;
510         } finally {
511             fullyUnlock();
512         }
513     }
514
515     /**
516      * Returns an array containing all of the elements in this queue, in
517      * proper sequence; the runtime type of the returned array is that of
518      * the specified array.  If the queue fits in the specified array, it
519      * is returned therein.  Otherwise, a new array is allocated with the
520      * runtime type of the specified array and the size of this queue.
521      *
522      * <p>If this queue fits in the specified array with room to spare
523      * (i.e., the array has more elements than this queue), the element in
524      * the array immediately following the end of the queue is set to
525      * <tt>null</tt>.
526      *
527      * <p>Like the {@link #toArray()} method, this method acts as bridge between
528      * array-based and collection-based APIs.  Further, this method allows
529      * precise control over the runtime type of the output array, and may,
530      * under certain circumstances, be used to save allocation costs.
531      *
532      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
533      * The following code can be used to dump the queue into a newly
534      * allocated array of <tt>String</tt>:
535      *
536      * <pre>
537      *     String[] y = x.toArray(new String[0]);</pre>
538      *
539      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
540      * <tt>toArray()</tt>.
541      *
542      * @param a the array into which the elements of the queue are to
543      *          be stored, if it is big enough; otherwise, a new array of the
544      *          same runtime type is allocated for this purpose
545      * @return an array containing all of the elements in this queue
546      * @throws ArrayStoreException if the runtime type of the specified array
547      *         is not a supertype of the runtime type of every element in
548      *         this queue
549      * @throws NullPointerException if the specified array is null
550      */
551     public <T> T[] toArray(T[] a) {
552         fullyLock();
553         try {
554             int size = count.get();
555             if (a.length < size)
556                 a = (T[])java.lang.reflect.Array.newInstance
557                     (a.getClass().getComponentType(), size);
558
559             int k = 0;
560             for (Node p = head.next; p != null; p = p.next)
561                 a[k++] = (T)p.item;
562             if (a.length > k)
563                 a[k] = null;
564             return a;
565         } finally {
566             fullyUnlock();
567         }
568     }
569
570     public String toString() {
571         fullyLock();
572         try {
573             return super.toString();
574         } finally {
575             fullyUnlock();
576         }
577     }
578
579     /**
580      * Atomically removes all of the elements from this queue.
581      * The queue will be empty after this call returns.
582      */
583     public void clear() {
584         fullyLock();
585         try {
586             head.next = null;
587             assert head.item == null;
588             last = head;
589             if (count.getAndSet(0) == capacity)
590                 notFull.signalAll();
591         } finally {
592             fullyUnlock();
593         }
594     }
595
596     /**
597      * @throws UnsupportedOperationException {@inheritDoc}
598      * @throws ClassCastException            {@inheritDoc}
599      * @throws NullPointerException          {@inheritDoc}
600      * @throws IllegalArgumentException      {@inheritDoc}
601      */
602     public int drainTo(Collection<? super E> c) {
603         if (c == null)
604             throw new NullPointerException();
605         if (c == this)
606             throw new IllegalArgumentException();
607         Node<E> first;
608         fullyLock();
609         try {
610             first = head.next;
611             head.next = null;
612             assert head.item == null;
613             last = head;
614             if (count.getAndSet(0) == capacity)
615                 notFull.signalAll();
616         } finally {
617             fullyUnlock();
618         }
619         // Transfer the elements outside of locks
620         int n = 0;
621         for (Node<E> p = first; p != null; p = p.next) {
622             c.add(p.item);
623             p.item = null;
624             ++n;
625         }
626         return n;
627     }
628
629     /**
630      * @throws UnsupportedOperationException {@inheritDoc}
631      * @throws ClassCastException            {@inheritDoc}
632      * @throws NullPointerException          {@inheritDoc}
633      * @throws IllegalArgumentException      {@inheritDoc}
634      */
635     public int drainTo(Collection<? super E> c, int maxElements) {
636         if (c == null)
637             throw new NullPointerException();
638         if (c == this)
639             throw new IllegalArgumentException();
640         fullyLock();
641         try {
642             int n = 0;
643             Node<E> p = head.next;
644             while (p != null && n < maxElements) {
645                 c.add(p.item);
646                 p.item = null;
647                 p = p.next;
648                 ++n;
649             }
650             if (n != 0) {
651                 head.next = p;
652                 assert head.item == null;
653                 if (p == null)
654                     last = head;
655                 if (count.getAndAdd(-n) == capacity)
656                     notFull.signalAll();
657             }
658             return n;
659         } finally {
660             fullyUnlock();
661         }
662     }
663
664     /**
665      * Returns an iterator over the elements in this queue in proper sequence.
666      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
667      * will never throw {@link ConcurrentModificationException},
668      * and guarantees to traverse elements as they existed upon
669      * construction of the iterator, and may (but is not guaranteed to)
670      * reflect any modifications subsequent to construction.
671      *
672      * @return an iterator over the elements in this queue in proper sequence
673      */
674     public Iterator<E> iterator() {
675       return new Itr();
676     }
677
678     private class Itr implements Iterator<E> {
679         /*
680          * Basic weak-consistent iterator.  At all times hold the next
681          * item to hand out so that if hasNext() reports true, we will
682          * still have it to return even if lost race with a take etc.
683          */
684         private Node<E> current;
685         private Node<E> lastRet;
686         private E currentElement;
687
688         Itr() {
689             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
690             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
691             putLock.lock();
692             takeLock.lock();
693             try {
694                 current = head.next;
695                 if (current != null)
696                     currentElement = current.item;
697             } finally {
698                 takeLock.unlock();
699                 putLock.unlock();
700             }
701         }
702
703         public boolean hasNext() {
704             return current != null;
705         }
706
707         public E next() {
708             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
709             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
710             putLock.lock();
711             takeLock.lock();
712             try {
713                 if (current == null)
714                     throw new NoSuchElementException();
715                 E x = currentElement;
716                 lastRet = current;
717                 current = current.next;
718                 if (current != null)
719                     currentElement = current.item;
720                 return x;
721             } finally {
722                 takeLock.unlock();
723                 putLock.unlock();
724             }
725         }
726
727         public void remove() {
728             if (lastRet == null)
729                 throw new IllegalStateException();
730             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
731             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
732             putLock.lock();
733             takeLock.lock();
734             try {
735                 Node<E> node = lastRet;
736                 lastRet = null;
737                 Node<E> trail = head;
738                 Node<E> p = head.next;
739                 while (p != null && p != node) {
740                     trail = p;
741                     p = p.next;
742                 }
743                 if (p == node) {
744                     p.item = null;
745                     trail.next = p.next;
746                     if (last == p)
747                         last = trail;
748                     int c = count.getAndDecrement();
749                     if (c == capacity)
750                         notFull.signalAll();
751                 }
752             } finally {
753                 takeLock.unlock();
754                 putLock.unlock();
755             }
756         }
757     }
758
759     /**
760      * Save the state to a stream (that is, serialize it).
761      *
762      * @serialData The capacity is emitted (int), followed by all of
763      * its elements (each an <tt>Object</tt>) in the proper order,
764      * followed by a null
765      * @param s the stream
766      */
767     private void writeObject(java.io.ObjectOutputStream s)
768         throws java.io.IOException {
769
770         fullyLock();
771         try {
772             // Write out any hidden stuff, plus capacity
773             s.defaultWriteObject();
774
775             // Write out all elements in the proper order.
776             for (Node<E> p = head.next; p != null; p = p.next)
777                 s.writeObject(p.item);
778
779             // Use trailing null as sentinel
780             s.writeObject(null);
781         } finally {
782             fullyUnlock();
783         }
784     }
785
786     /**
787      * Reconstitute this queue instance from a stream (that is,
788      * deserialize it).
789      * @param s the stream
790      */
791     private void readObject(java.io.ObjectInputStream s)
792         throws java.io.IOException, ClassNotFoundException {
793         // Read in capacity, and any hidden stuff
794         s.defaultReadObject();
795
796         count.set(0);
797         last = head = new Node<E>(null);
798
799         // Read in all elements and place in queue
800         for (;;) {
801             E item = (E)s.readObject();
802             if (item == null)
803                 break;
804             add(item);
805         }
806     }
807 }