OSDN Git Service

original
[gb-231r1-is01/Gingerbread_2.3.3_r1_IS01.git] / libcore / luni / src / main / java / java / util / concurrent / ArrayBlockingQueue.java
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.ConcurrentModificationException;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
16
17 // BEGIN android-note
18 // removed link to collections framework docs
19 // END android-note
20
21 /**
22  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23  * array.  This queue orders elements FIFO (first-in-first-out).  The
24  * <em>head</em> of the queue is that element that has been on the
25  * queue the longest time.  The <em>tail</em> of the queue is that
26  * element that has been on the queue the shortest time. New elements
27  * are inserted at the tail of the queue, and the queue retrieval
28  * operations obtain elements at the head of the queue.
29  *
30  * <p>This is a classic &quot;bounded buffer&quot;, in which a
31  * fixed-sized array holds elements inserted by producers and
32  * extracted by consumers.  Once created, the capacity cannot be
33  * increased.  Attempts to <tt>put</tt> an element into a full queue
34  * will result in the operation blocking; attempts to <tt>take</tt> an
35  * element from an empty queue will similarly block.
36  *
37  * <p> This class supports an optional fairness policy for ordering
38  * waiting producer and consumer threads.  By default, this ordering
39  * is not guaranteed. However, a queue constructed with fairness set
40  * to <tt>true</tt> grants threads access in FIFO order. Fairness
41  * generally decreases throughput but reduces variability and avoids
42  * starvation.
43  *
44  * <p>This class and its iterator implement all of the
45  * <em>optional</em> methods of the {@link Collection} and {@link
46  * Iterator} interfaces.
47  *
48  * @since 1.5
49  * @author Doug Lea
50  * @param <E> the type of elements held in this collection
51  */
52 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
53         implements BlockingQueue<E>, java.io.Serializable {
54
55     /**
56      * Serialization ID. This class relies on default serialization
57      * even for the items array, which is default-serialized, even if
58      * it is empty. Otherwise it could not be declared final, which is
59      * necessary here.
60      */
61     private static final long serialVersionUID = -817911632652898426L;
62
63     /** The queued items  */
64     private final E[] items;
65     /** items index for next take, poll or remove */
66     private int takeIndex;
67     /** items index for next put, offer, or add. */
68     private int putIndex;
69     /** Number of items in the queue */
70     private int count;
71
72     /*
73      * Concurrency control uses the classic two-condition algorithm
74      * found in any textbook.
75      */
76
77     /** Main lock guarding all access */
78     private final ReentrantLock lock;
79     /** Condition for waiting takes */
80     private final Condition notEmpty;
81     /** Condition for waiting puts */
82     private final Condition notFull;
83
84     // Internal helper methods
85
86     /**
87      * Circularly increment i.
88      */
89     final int inc(int i) {
90         return (++i == items.length)? 0 : i;
91     }
92
93     /**
94      * Inserts element at current put position, advances, and signals.
95      * Call only when holding lock.
96      */
97     private void insert(E x) {
98         items[putIndex] = x;
99         putIndex = inc(putIndex);
100         ++count;
101         notEmpty.signal();
102     }
103
104     /**
105      * Extracts element at current take position, advances, and signals.
106      * Call only when holding lock.
107      */
108     private E extract() {
109         final E[] items = this.items;
110         E x = items[takeIndex];
111         items[takeIndex] = null;
112         takeIndex = inc(takeIndex);
113         --count;
114         notFull.signal();
115         return x;
116     }
117
118     /**
119      * Utility for remove and iterator.remove: Delete item at position i.
120      * Call only when holding lock.
121      */
122     void removeAt(int i) {
123         final E[] items = this.items;
124         // if removing front item, just advance
125         if (i == takeIndex) {
126             items[takeIndex] = null;
127             takeIndex = inc(takeIndex);
128         } else {
129             // slide over all others up through putIndex.
130             for (;;) {
131                 int nexti = inc(i);
132                 if (nexti != putIndex) {
133                     items[i] = items[nexti];
134                     i = nexti;
135                 } else {
136                     items[i] = null;
137                     putIndex = i;
138                     break;
139                 }
140             }
141         }
142         --count;
143         notFull.signal();
144     }
145
146     /**
147      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
148      * capacity and default access policy.
149      *
150      * @param capacity the capacity of this queue
151      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
152      */
153     public ArrayBlockingQueue(int capacity) {
154         this(capacity, false);
155     }
156
157     /**
158      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
159      * capacity and the specified access policy.
160      *
161      * @param capacity the capacity of this queue
162      * @param fair if <tt>true</tt> then queue accesses for threads blocked
163      *        on insertion or removal, are processed in FIFO order;
164      *        if <tt>false</tt> the access order is unspecified.
165      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
166      */
167     public ArrayBlockingQueue(int capacity, boolean fair) {
168         if (capacity <= 0)
169             throw new IllegalArgumentException();
170         this.items = (E[]) new Object[capacity];
171         lock = new ReentrantLock(fair);
172         notEmpty = lock.newCondition();
173         notFull =  lock.newCondition();
174     }
175
176     /**
177      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
178      * capacity, the specified access policy and initially containing the
179      * elements of the given collection,
180      * added in traversal order of the collection's iterator.
181      *
182      * @param capacity the capacity of this queue
183      * @param fair if <tt>true</tt> then queue accesses for threads blocked
184      *        on insertion or removal, are processed in FIFO order;
185      *        if <tt>false</tt> the access order is unspecified.
186      * @param c the collection of elements to initially contain
187      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
188      *         <tt>c.size()</tt>, or less than 1.
189      * @throws NullPointerException if the specified collection or any
190      *         of its elements are null
191      */
192     public ArrayBlockingQueue(int capacity, boolean fair,
193                               Collection<? extends E> c) {
194         this(capacity, fair);
195         if (capacity < c.size())
196             throw new IllegalArgumentException();
197
198         for (E e : c)
199             add(e);
200     }
201
202     /**
203      * Inserts the specified element at the tail of this queue if it is
204      * possible to do so immediately without exceeding the queue's capacity,
205      * returning <tt>true</tt> upon success and throwing an
206      * <tt>IllegalStateException</tt> if this queue is full.
207      *
208      * @param e the element to add
209      * @return <tt>true</tt> (as specified by {@link Collection#add})
210      * @throws IllegalStateException if this queue is full
211      * @throws NullPointerException if the specified element is null
212      */
213     public boolean add(E e) {
214         return super.add(e);
215     }
216
217     /**
218      * Inserts the specified element at the tail of this queue if it is
219      * possible to do so immediately without exceeding the queue's capacity,
220      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
221      * is full.  This method is generally preferable to method {@link #add},
222      * which can fail to insert an element only by throwing an exception.
223      *
224      * @throws NullPointerException if the specified element is null
225      */
226     public boolean offer(E e) {
227         if (e == null) throw new NullPointerException();
228         final ReentrantLock lock = this.lock;
229         lock.lock();
230         try {
231             if (count == items.length)
232                 return false;
233             else {
234                 insert(e);
235                 return true;
236             }
237         } finally {
238             lock.unlock();
239         }
240     }
241
242     /**
243      * Inserts the specified element at the tail of this queue, waiting
244      * for space to become available if the queue is full.
245      *
246      * @throws InterruptedException {@inheritDoc}
247      * @throws NullPointerException {@inheritDoc}
248      */
249     public void put(E e) throws InterruptedException {
250         if (e == null) throw new NullPointerException();
251         final E[] items = this.items;
252         final ReentrantLock lock = this.lock;
253         lock.lockInterruptibly();
254         try {
255             try {
256                 while (count == items.length)
257                     notFull.await();
258             } catch (InterruptedException ie) {
259                 notFull.signal(); // propagate to non-interrupted thread
260                 throw ie;
261             }
262             insert(e);
263         } finally {
264             lock.unlock();
265         }
266     }
267
268     /**
269      * Inserts the specified element at the tail of this queue, waiting
270      * up to the specified wait time for space to become available if
271      * the queue is full.
272      *
273      * @throws InterruptedException {@inheritDoc}
274      * @throws NullPointerException {@inheritDoc}
275      */
276     public boolean offer(E e, long timeout, TimeUnit unit)
277         throws InterruptedException {
278
279         if (e == null) throw new NullPointerException();
280         long nanos = unit.toNanos(timeout);
281         final ReentrantLock lock = this.lock;
282         lock.lockInterruptibly();
283         try {
284             for (;;) {
285                 if (count != items.length) {
286                     insert(e);
287                     return true;
288                 }
289                 if (nanos <= 0)
290                     return false;
291                 try {
292                     nanos = notFull.awaitNanos(nanos);
293                 } catch (InterruptedException ie) {
294                     notFull.signal(); // propagate to non-interrupted thread
295                     throw ie;
296                 }
297             }
298         } finally {
299             lock.unlock();
300         }
301     }
302
303     public E poll() {
304         final ReentrantLock lock = this.lock;
305         lock.lock();
306         try {
307             if (count == 0)
308                 return null;
309             E x = extract();
310             return x;
311         } finally {
312             lock.unlock();
313         }
314     }
315
316     public E take() throws InterruptedException {
317         final ReentrantLock lock = this.lock;
318         lock.lockInterruptibly();
319         try {
320             try {
321                 while (count == 0)
322                     notEmpty.await();
323             } catch (InterruptedException ie) {
324                 notEmpty.signal(); // propagate to non-interrupted thread
325                 throw ie;
326             }
327             E x = extract();
328             return x;
329         } finally {
330             lock.unlock();
331         }
332     }
333
334     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
335         long nanos = unit.toNanos(timeout);
336         final ReentrantLock lock = this.lock;
337         lock.lockInterruptibly();
338         try {
339             for (;;) {
340                 if (count != 0) {
341                     E x = extract();
342                     return x;
343                 }
344                 if (nanos <= 0)
345                     return null;
346                 try {
347                     nanos = notEmpty.awaitNanos(nanos);
348                 } catch (InterruptedException ie) {
349                     notEmpty.signal(); // propagate to non-interrupted thread
350                     throw ie;
351                 }
352
353             }
354         } finally {
355             lock.unlock();
356         }
357     }
358
359     public E peek() {
360         final ReentrantLock lock = this.lock;
361         lock.lock();
362         try {
363             return (count == 0) ? null : items[takeIndex];
364         } finally {
365             lock.unlock();
366         }
367     }
368
369     // this doc comment is overridden to remove the reference to collections
370     // greater in size than Integer.MAX_VALUE
371     /**
372      * Returns the number of elements in this queue.
373      *
374      * @return the number of elements in this queue
375      */
376     public int size() {
377         final ReentrantLock lock = this.lock;
378         lock.lock();
379         try {
380             return count;
381         } finally {
382             lock.unlock();
383         }
384     }
385
386     // this doc comment is a modified copy of the inherited doc comment,
387     // without the reference to unlimited queues.
388     /**
389      * Returns the number of additional elements that this queue can ideally
390      * (in the absence of memory or resource constraints) accept without
391      * blocking. This is always equal to the initial capacity of this queue
392      * less the current <tt>size</tt> of this queue.
393      *
394      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
395      * an element will succeed by inspecting <tt>remainingCapacity</tt>
396      * because it may be the case that another thread is about to
397      * insert or remove an element.
398      */
399     public int remainingCapacity() {
400         final ReentrantLock lock = this.lock;
401         lock.lock();
402         try {
403             return items.length - count;
404         } finally {
405             lock.unlock();
406         }
407     }
408
409     /**
410      * Removes a single instance of the specified element from this queue,
411      * if it is present.  More formally, removes an element <tt>e</tt> such
412      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
413      * elements.
414      * Returns <tt>true</tt> if this queue contained the specified element
415      * (or equivalently, if this queue changed as a result of the call).
416      *
417      * @param o element to be removed from this queue, if present
418      * @return <tt>true</tt> if this queue changed as a result of the call
419      */
420     public boolean remove(Object o) {
421         if (o == null) return false;
422         final E[] items = this.items;
423         final ReentrantLock lock = this.lock;
424         lock.lock();
425         try {
426             int i = takeIndex;
427             int k = 0;
428             for (;;) {
429                 if (k++ >= count)
430                     return false;
431                 if (o.equals(items[i])) {
432                     removeAt(i);
433                     return true;
434                 }
435                 i = inc(i);
436             }
437
438         } finally {
439             lock.unlock();
440         }
441     }
442
443     /**
444      * Returns <tt>true</tt> if this queue contains the specified element.
445      * More formally, returns <tt>true</tt> if and only if this queue contains
446      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
447      *
448      * @param o object to be checked for containment in this queue
449      * @return <tt>true</tt> if this queue contains the specified element
450      */
451     public boolean contains(Object o) {
452         if (o == null) return false;
453         final E[] items = this.items;
454         final ReentrantLock lock = this.lock;
455         lock.lock();
456         try {
457             int i = takeIndex;
458             int k = 0;
459             while (k++ < count) {
460                 if (o.equals(items[i]))
461                     return true;
462                 i = inc(i);
463             }
464             return false;
465         } finally {
466             lock.unlock();
467         }
468     }
469
470     /**
471      * Returns an array containing all of the elements in this queue, in
472      * proper sequence.
473      *
474      * <p>The returned array will be "safe" in that no references to it are
475      * maintained by this queue.  (In other words, this method must allocate
476      * a new array).  The caller is thus free to modify the returned array.
477      *
478      * <p>This method acts as bridge between array-based and collection-based
479      * APIs.
480      *
481      * @return an array containing all of the elements in this queue
482      */
483     public Object[] toArray() {
484         final E[] items = this.items;
485         final ReentrantLock lock = this.lock;
486         lock.lock();
487         try {
488             Object[] a = new Object[count];
489             int k = 0;
490             int i = takeIndex;
491             while (k < count) {
492                 a[k++] = items[i];
493                 i = inc(i);
494             }
495             return a;
496         } finally {
497             lock.unlock();
498         }
499     }
500
501     /**
502      * Returns an array containing all of the elements in this queue, in
503      * proper sequence; the runtime type of the returned array is that of
504      * the specified array.  If the queue fits in the specified array, it
505      * is returned therein.  Otherwise, a new array is allocated with the
506      * runtime type of the specified array and the size of this queue.
507      *
508      * <p>If this queue fits in the specified array with room to spare
509      * (i.e., the array has more elements than this queue), the element in
510      * the array immediately following the end of the queue is set to
511      * <tt>null</tt>.
512      *
513      * <p>Like the {@link #toArray()} method, this method acts as bridge between
514      * array-based and collection-based APIs.  Further, this method allows
515      * precise control over the runtime type of the output array, and may,
516      * under certain circumstances, be used to save allocation costs.
517      *
518      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
519      * The following code can be used to dump the queue into a newly
520      * allocated array of <tt>String</tt>:
521      *
522      * <pre>
523      *     String[] y = x.toArray(new String[0]);</pre>
524      *
525      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
526      * <tt>toArray()</tt>.
527      *
528      * @param a the array into which the elements of the queue are to
529      *          be stored, if it is big enough; otherwise, a new array of the
530      *          same runtime type is allocated for this purpose
531      * @return an array containing all of the elements in this queue
532      * @throws ArrayStoreException if the runtime type of the specified array
533      *         is not a supertype of the runtime type of every element in
534      *         this queue
535      * @throws NullPointerException if the specified array is null
536      */
537     public <T> T[] toArray(T[] a) {
538         final E[] items = this.items;
539         final ReentrantLock lock = this.lock;
540         lock.lock();
541         try {
542             if (a.length < count)
543                 a = (T[])java.lang.reflect.Array.newInstance(
544                     a.getClass().getComponentType(),
545                     count
546                     );
547
548             int k = 0;
549             int i = takeIndex;
550             while (k < count) {
551                 a[k++] = (T)items[i];
552                 i = inc(i);
553             }
554             if (a.length > count)
555                 a[count] = null;
556             return a;
557         } finally {
558             lock.unlock();
559         }
560     }
561
562     public String toString() {
563         final ReentrantLock lock = this.lock;
564         lock.lock();
565         try {
566             return super.toString();
567         } finally {
568             lock.unlock();
569         }
570     }
571
572     /**
573      * Atomically removes all of the elements from this queue.
574      * The queue will be empty after this call returns.
575      */
576     public void clear() {
577         final E[] items = this.items;
578         final ReentrantLock lock = this.lock;
579         lock.lock();
580         try {
581             int i = takeIndex;
582             int k = count;
583             while (k-- > 0) {
584                 items[i] = null;
585                 i = inc(i);
586             }
587             count = 0;
588             putIndex = 0;
589             takeIndex = 0;
590             notFull.signalAll();
591         } finally {
592             lock.unlock();
593         }
594     }
595
596     /**
597      * @throws UnsupportedOperationException {@inheritDoc}
598      * @throws ClassCastException            {@inheritDoc}
599      * @throws NullPointerException          {@inheritDoc}
600      * @throws IllegalArgumentException      {@inheritDoc}
601      */
602     public int drainTo(Collection<? super E> c) {
603         if (c == null)
604             throw new NullPointerException();
605         if (c == this)
606             throw new IllegalArgumentException();
607         final E[] items = this.items;
608         final ReentrantLock lock = this.lock;
609         lock.lock();
610         try {
611             int i = takeIndex;
612             int n = 0;
613             int max = count;
614             while (n < max) {
615                 c.add(items[i]);
616                 items[i] = null;
617                 i = inc(i);
618                 ++n;
619             }
620             if (n > 0) {
621                 count = 0;
622                 putIndex = 0;
623                 takeIndex = 0;
624                 notFull.signalAll();
625             }
626             return n;
627         } finally {
628             lock.unlock();
629         }
630     }
631
632     /**
633      * @throws UnsupportedOperationException {@inheritDoc}
634      * @throws ClassCastException            {@inheritDoc}
635      * @throws NullPointerException          {@inheritDoc}
636      * @throws IllegalArgumentException      {@inheritDoc}
637      */
638     public int drainTo(Collection<? super E> c, int maxElements) {
639         if (c == null)
640             throw new NullPointerException();
641         if (c == this)
642             throw new IllegalArgumentException();
643         if (maxElements <= 0)
644             return 0;
645         final E[] items = this.items;
646         final ReentrantLock lock = this.lock;
647         lock.lock();
648         try {
649             int i = takeIndex;
650             int n = 0;
651             int sz = count;
652             int max = (maxElements < count)? maxElements : count;
653             while (n < max) {
654                 c.add(items[i]);
655                 items[i] = null;
656                 i = inc(i);
657                 ++n;
658             }
659             if (n > 0) {
660                 count -= n;
661                 takeIndex = i;
662                 notFull.signalAll();
663             }
664             return n;
665         } finally {
666             lock.unlock();
667         }
668     }
669
670
671     /**
672      * Returns an iterator over the elements in this queue in proper sequence.
673      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
674      * will never throw {@link ConcurrentModificationException},
675      * and guarantees to traverse elements as they existed upon
676      * construction of the iterator, and may (but is not guaranteed to)
677      * reflect any modifications subsequent to construction.
678      *
679      * @return an iterator over the elements in this queue in proper sequence
680      */
681     public Iterator<E> iterator() {
682         final ReentrantLock lock = this.lock;
683         lock.lock();
684         try {
685             return new Itr();
686         } finally {
687             lock.unlock();
688         }
689     }
690
691     /**
692      * Iterator for ArrayBlockingQueue
693      */
694     private class Itr implements Iterator<E> {
695         /**
696          * Index of element to be returned by next,
697          * or a negative number if no such.
698          */
699         private int nextIndex;
700
701         /**
702          * nextItem holds on to item fields because once we claim
703          * that an element exists in hasNext(), we must return it in
704          * the following next() call even if it was in the process of
705          * being removed when hasNext() was called.
706          */
707         private E nextItem;
708
709         /**
710          * Index of element returned by most recent call to next.
711          * Reset to -1 if this element is deleted by a call to remove.
712          */
713         private int lastRet;
714
715         Itr() {
716             lastRet = -1;
717             if (count == 0)
718                 nextIndex = -1;
719             else {
720                 nextIndex = takeIndex;
721                 nextItem = items[takeIndex];
722             }
723         }
724
725         public boolean hasNext() {
726             /*
727              * No sync. We can return true by mistake here
728              * only if this iterator passed across threads,
729              * which we don't support anyway.
730              */
731             return nextIndex >= 0;
732         }
733
734         /**
735          * Checks whether nextIndex is valid; if so setting nextItem.
736          * Stops iterator when either hits putIndex or sees null item.
737          */
738         private void checkNext() {
739             if (nextIndex == putIndex) {
740                 nextIndex = -1;
741                 nextItem = null;
742             } else {
743                 nextItem = items[nextIndex];
744                 if (nextItem == null)
745                     nextIndex = -1;
746             }
747         }
748
749         public E next() {
750             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
751             lock.lock();
752             try {
753                 if (nextIndex < 0)
754                     throw new NoSuchElementException();
755                 lastRet = nextIndex;
756                 E x = nextItem;
757                 nextIndex = inc(nextIndex);
758                 checkNext();
759                 return x;
760             } finally {
761                 lock.unlock();
762             }
763         }
764
765         public void remove() {
766             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
767             lock.lock();
768             try {
769                 int i = lastRet;
770                 if (i == -1)
771                     throw new IllegalStateException();
772                 lastRet = -1;
773
774                 int ti = takeIndex;
775                 removeAt(i);
776                 // back up cursor (reset to front if was first element)
777                 nextIndex = (i == ti) ? takeIndex : i;
778                 checkNext();
779             } finally {
780                 lock.unlock();
781             }
782         }
783     }
784 }