OSDN Git Service

Cleaning up PipedInputStream.
authorJesse Wilson <jessewilson@google.com>
Mon, 21 Sep 2009 23:54:54 +0000 (16:54 -0700)
committerJesse Wilson <jessewilson@google.com>
Tue, 22 Sep 2009 01:37:42 +0000 (18:37 -0700)
In particular, fixing a problem where the reader thread fails
as soon as the writer thread exists, even if there is data left
for the reader thread to read. Similarly, the writer fails fast
even when the buffer has space remaining.

Also fixing some concurrency issues by making readers and writers
release each other more aggressively.

libcore/luni/src/main/java/java/io/PipedInputStream.java

index 83987ec..a6b0336 100644 (file)
@@ -33,7 +33,22 @@ public class PipedInputStream extends InputStream {
     private boolean isClosed = false;
 
     /**
-     * The circular buffer through which data is passed.
+     * The circular buffer through which data is passed. Data is read from the
+     * range {@code [out, in)} and written to the range {@code [in, out)}.
+     * Data in the buffer is either sequential: <pre>
+     *     { - - - X X X X X X X - - - - - }
+     *             ^             ^
+     *             |             |
+     *            out           in</pre>
+     * ...or wrapped around the buffer's end: <pre>
+     *     { X X X X - - - - - - - - X X X }
+     *               ^               ^
+     *               |               |
+     *              in              out</pre>
+     * When the buffer is empty, {@code in == -1}. Reading when the buffer is
+     * empty will block until data is available. When the buffer is full,
+     * {@code in == out}. Writing when the buffer is full will block until free
+     * space is available.
      */
     protected byte buffer[];
 
@@ -158,10 +173,14 @@ public class PipedInputStream extends InputStream {
             throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
         }
 
-        if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
-            // KA030=Write end dead
-            throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
-        }
+        // BEGIN android-removed
+        // eagerly throwing prevents checking isClosed and returning normally
+        // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
+        //     // KA030=Write end dead
+        //     throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
+        // }
+        // END android-removed
+
         /**
          * Set the last thread to be reading on this PipedInputStream. If
          * lastReader dies while someone is waiting to write an IOException of
@@ -187,7 +206,8 @@ public class PipedInputStream extends InputStream {
             throw new InterruptedIOException();
         }
 
-        byte result = buffer[out++];
+        // BEGIN android-changed
+        int result = buffer[out++] & 0xff;
         if (out == buffer.length) {
             out = 0;
         }
@@ -196,7 +216,12 @@ public class PipedInputStream extends InputStream {
             in = -1;
             out = 0;
         }
-        return result & 0xff;
+
+        // let blocked writers write to the newly available buffer space
+        notifyAll();
+
+        return result;
+        // END android-changed
     }
 
     /**
@@ -261,10 +286,13 @@ public class PipedInputStream extends InputStream {
             throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
         }
 
-        if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
-            // KA030=Write end dead
-            throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
-        }
+        // BEGIN android-removed
+        // eagerly throwing prevents checking isClosed and returning normally
+        // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
+        //     // KA030=Write end dead
+        //     throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
+        // }
+        // END android-removed
 
         /**
          * Set the last thread to be reading on this PipedInputStream. If
@@ -291,13 +319,15 @@ public class PipedInputStream extends InputStream {
             throw new InterruptedIOException();
         }
 
-        int copyLength = 0;
-        /* Copy bytes from out to end of buffer first */
+        // BEGIN android-changed
+        int totalCopied = 0;
+
+        // copy bytes from out thru the end of buffer
         if (out >= in) {
-            copyLength = count > (buffer.length - out) ? buffer.length - out
-                    : count;
-            System.arraycopy(buffer, out, bytes, offset, copyLength);
-            out += copyLength;
+            int leftInBuffer = buffer.length - out;
+            int length = leftInBuffer < count ? leftInBuffer : count;
+            System.arraycopy(buffer, out, bytes, offset, length);
+            out += length;
             if (out == buffer.length) {
                 out = 0;
             }
@@ -306,28 +336,29 @@ public class PipedInputStream extends InputStream {
                 in = -1;
                 out = 0;
             }
+            totalCopied += length;
         }
 
-        /*
-         * Did the read fully succeed in the previous copy or is the buffer
-         * empty?
-         */
-        if (copyLength == count || in == -1) {
-            return copyLength;
+        // copy bytes from out thru in
+        if (totalCopied < count && in != -1) {
+            int leftInBuffer = in - out;
+            int leftToCopy = count - totalCopied;
+            int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
+            System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
+            out += length;
+            if (out == in) {
+                // empty buffer
+                in = -1;
+                out = 0;
+            }
+            totalCopied += length;
         }
 
-        int bytesCopied = copyLength;
-        /* Copy bytes from 0 to the number of available bytes */
-        copyLength = in - out > (count - bytesCopied) ? count - bytesCopied
-                : in - out;
-        System.arraycopy(buffer, out, bytes, offset + bytesCopied, copyLength);
-        out += copyLength;
-        if (out == in) {
-            // empty buffer
-            in = -1;
-            out = 0;
-        }
-        return bytesCopied + copyLength;
+        // let blocked writers write to the newly available buffer space
+        notifyAll();
+
+        return totalCopied;
+        // END android-changed
     }
 
     /**
@@ -351,9 +382,12 @@ public class PipedInputStream extends InputStream {
         if (buffer == null || isClosed) {
             throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
         }
-        if (lastReader != null && !lastReader.isAlive()) {
-            throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
-        }
+        // BEGIN android-removed
+        // eagerly throwing causes us to fail even if the buffer's not full
+        // if (lastReader != null && !lastReader.isAlive()) {
+        //     throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
+        // }
+        // END android-removed
         /**
          * Set the last thread to be writing on this PipedInputStream. If
          * lastWriter dies while someone is waiting to read an IOException of
@@ -362,11 +396,14 @@ public class PipedInputStream extends InputStream {
         lastWriter = Thread.currentThread();
         try {
             while (buffer != null && out == in) {
-                notifyAll();
-                wait(1000);
+                // BEGIN android-changed
+                // moved has-last-reader-died check to be before wait()
                 if (lastReader != null && !lastReader.isAlive()) {
                     throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
                 }
+                notifyAll();
+                wait(1000);
+                // END android-changed
             }
         } catch (InterruptedException e) {
             throw new InterruptedIOException();
@@ -379,7 +416,11 @@ public class PipedInputStream extends InputStream {
             if (in == buffer.length) {
                 in = 0;
             }
-            return;
+
+            // BEGIN android-added
+            // let blocked readers read the newly available data
+            notifyAll();
+            // END android-added
         }
     }