OSDN Git Service

fifo: Add support for multi-process
authorGlenn Kasten <gkasten@google.com>
Fri, 2 Sep 2016 20:54:59 +0000 (13:54 -0700)
committerGlenn Kasten <gkasten@google.com>
Mon, 3 Oct 2016 17:06:27 +0000 (10:06 -0700)
For multi-process, the indices are allocated in separate shared memory.
This permits the buffer, write index, and throttling read index to each
have their own protection.

By default, the indices are allocated within the object to retain
API compatibility for the single-process case.

Test: in next CL
Change-Id: I5fae59169dbf73e8e38bf0eba0dcf907da2f8679

audio_utils/fifo.cpp
audio_utils/include/audio_utils/fifo.h

index 3574dbf..f19f93b 100644 (file)
@@ -54,12 +54,13 @@ static int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, vo
 #endif
 }
 
-audio_utils_fifo_base::audio_utils_fifo_base(uint32_t frameCount)
+audio_utils_fifo_base::audio_utils_fifo_base(uint32_t frameCount,
+        audio_utils_fifo_index& sharedRear, audio_utils_fifo_index *throttleFront)
         __attribute__((no_sanitize("integer"))) :
     mFrameCount(frameCount), mFrameCountP2(roundup(frameCount)),
     mFudgeFactor(mFrameCountP2 - mFrameCount),
     mIsPrivate(true),
-    mSharedRear(0), mThrottleFront(NULL)
+    mSharedRear(sharedRear), mThrottleFront(throttleFront)
 {
     // actual upper bound on frameCount will depend on the frame size
     LOG_ALWAYS_FATAL_IF(frameCount == 0 || frameCount > ((uint32_t) INT_MAX));
@@ -122,9 +123,11 @@ int32_t audio_utils_fifo_base::diff(uint32_t rear, uint32_t front, size_t *lost)
 
 ////////////////////////////////////////////////////////////////////////////////
 
-audio_utils_fifo::audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer)
+audio_utils_fifo::audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer,
+        audio_utils_fifo_index& sharedRear, audio_utils_fifo_index *throttleFront)
         __attribute__((no_sanitize("integer"))) :
-    audio_utils_fifo_base(frameCount), mFrameSize(frameSize), mBuffer(buffer)
+    audio_utils_fifo_base(frameCount, sharedRear, throttleFront),
+    mFrameSize(frameSize), mBuffer(buffer)
 {
     // maximum value of frameCount * frameSize is INT_MAX (2^31 - 1), not 2^31, because we need to
     // be able to distinguish successful and error return values from read and write.
@@ -132,6 +135,13 @@ audio_utils_fifo::audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void
             frameCount > ((uint32_t) INT_MAX) / frameSize);
 }
 
+audio_utils_fifo::audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer,
+        bool throttlesWriter) :
+    audio_utils_fifo(frameCount, frameSize, buffer, mSingleProcessSharedRear,
+        throttlesWriter ?  &mSingleProcessSharedFront : NULL)
+{
+}
+
 audio_utils_fifo::~audio_utils_fifo()
 {
 }
@@ -186,7 +196,7 @@ ssize_t audio_utils_fifo_writer::obtain(audio_utils_iovec iovec[2], size_t count
     if (mFifo.mThrottleFront != NULL) {
         uint32_t front;
         for (;;) {
-            front = (uint32_t) atomic_load_explicit(mFifo.mThrottleFront,
+            front = atomic_load_explicit(&mFifo.mThrottleFront->mIndex,
                     std::memory_order_acquire);
             int32_t filled = mFifo.diff(mLocalRear, front, NULL /*lost*/);
             if (filled < 0) {
@@ -200,7 +210,7 @@ ssize_t audio_utils_fifo_writer::obtain(audio_utils_iovec iovec[2], size_t count
                     (timeout->tv_sec == 0 && timeout->tv_nsec == 0)) {
                 break;
             }
-            int err = sys_futex(mFifo.mThrottleFront,
+            int err = sys_futex(&mFifo.mThrottleFront->mIndex,
                     mFifo.mIsPrivate ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT, front, timeout, NULL, 0);
             if (err < 0) {
                 switch (errno) {
@@ -241,18 +251,18 @@ void audio_utils_fifo_writer::release(size_t count)
     if (count > 0) {
         LOG_ALWAYS_FATAL_IF(count > mObtained);
         if (mFifo.mThrottleFront != NULL) {
-            uint32_t front = (uint32_t) atomic_load_explicit(mFifo.mThrottleFront,
+            uint32_t front = atomic_load_explicit(&mFifo.mThrottleFront->mIndex,
                     std::memory_order_acquire);
             int32_t filled = mFifo.diff(mLocalRear, front, NULL /*lost*/);
             mLocalRear = mFifo.sum(mLocalRear, count);
-            atomic_store_explicit(&mFifo.mSharedRear, (uint_fast32_t) mLocalRear,
+            atomic_store_explicit(&mFifo.mSharedRear.mIndex, mLocalRear,
                     std::memory_order_release);
             if (filled >= 0) {
                 if (filled + count <= mLowLevelArm) {
                     mArmed = true;
                 }
                 if (mArmed && filled + count >= mHighLevelTrigger) {
-                    int err = sys_futex(&mFifo.mSharedRear,
+                    int err = sys_futex(&mFifo.mSharedRear.mIndex,
                             mFifo.mIsPrivate ? FUTEX_WAKE_PRIVATE : FUTEX_WAKE,
                             INT_MAX /*waiters*/, NULL, NULL, 0);
                     // err is number of processes woken up
@@ -264,7 +274,7 @@ void audio_utils_fifo_writer::release(size_t count)
             }
         } else {
             mLocalRear = mFifo.sum(mLocalRear, count);
-            atomic_store_explicit(&mFifo.mSharedRear, (uint_fast32_t) mLocalRear,
+            atomic_store_explicit(&mFifo.mSharedRear.mIndex, mLocalRear,
                     std::memory_order_release);
         }
         mObtained -= count;
@@ -274,21 +284,15 @@ void audio_utils_fifo_writer::release(size_t count)
 ////////////////////////////////////////////////////////////////////////////////
 
 audio_utils_fifo_reader::audio_utils_fifo_reader(audio_utils_fifo& fifo, bool throttlesWriter) :
-    audio_utils_fifo_provider(), mFifo(fifo), mLocalFront(0), mSharedFront(0),
+    audio_utils_fifo_provider(), mFifo(fifo), mLocalFront(0),
+    mThrottleFront(throttlesWriter ? mFifo.mThrottleFront : NULL),
     mHighLevelArm(0), mLowLevelTrigger(mFifo.mFrameCount), mArmed(false)
 {
-    if (throttlesWriter) {
-        LOG_ALWAYS_FATAL_IF(fifo.mThrottleFront != NULL);
-        fifo.mThrottleFront = &mSharedFront;
-    }
 }
 
 audio_utils_fifo_reader::~audio_utils_fifo_reader()
 {
     // TODO Need a way to pass throttle capability to the another reader, should one reader exit.
-    if (mFifo.mThrottleFront == &mSharedFront) {
-        mFifo.mThrottleFront = NULL;
-    }
 }
 
 ssize_t audio_utils_fifo_reader::read(void *buffer, size_t count, struct timespec *timeout,
@@ -322,19 +326,19 @@ void audio_utils_fifo_reader::release(size_t count)
 {
     if (count > 0) {
         LOG_ALWAYS_FATAL_IF(count > mObtained);
-        if (mFifo.mThrottleFront == &mSharedFront) {
-            uint32_t rear = (uint32_t) atomic_load_explicit(&mFifo.mSharedRear,
+        if (mThrottleFront != NULL) {
+            uint32_t rear = atomic_load_explicit(&mFifo.mSharedRear.mIndex,
                     std::memory_order_acquire);
             int32_t filled = mFifo.diff(rear, mLocalFront, NULL /*lost*/);
             mLocalFront = mFifo.sum(mLocalFront, count);
-            atomic_store_explicit(&mSharedFront, (uint_fast32_t) mLocalFront,
+            atomic_store_explicit(&mThrottleFront->mIndex, mLocalFront,
                     std::memory_order_release);
             if (filled >= 0) {
                 if (filled - count >= mHighLevelArm) {
                     mArmed = true;
                 }
                 if (mArmed && filled - count <= mLowLevelTrigger) {
-                    int err = sys_futex(&mFifo.mSharedRear,
+                    int err = sys_futex(&mFifo.mSharedRear.mIndex,
                             mFifo.mIsPrivate ? FUTEX_WAKE_PRIVATE : FUTEX_WAKE,
                             1 /*waiters*/, NULL, NULL, 0);
                     // err is number of processes woken up
@@ -357,14 +361,15 @@ ssize_t audio_utils_fifo_reader::obtain(audio_utils_iovec iovec[2], size_t count
 {
     uint32_t rear;
     for (;;) {
-        rear = (uint32_t) atomic_load_explicit(&mFifo.mSharedRear,
+        rear = atomic_load_explicit(&mFifo.mSharedRear.mIndex,
                 std::memory_order_acquire);
         // TODO pull out "count == 0"
         if (count == 0 || rear != mLocalFront || timeout == NULL ||
                 (timeout->tv_sec == 0 && timeout->tv_nsec == 0)) {
             break;
         }
-        int err = sys_futex(&mFifo.mSharedRear, mFifo.mIsPrivate ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT,
+        int err = sys_futex(&mFifo.mSharedRear.mIndex,
+                mFifo.mIsPrivate ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT,
                 rear, timeout, NULL, 0);
         if (err < 0) {
             switch (errno) {
index c896b62..53099b6 100644 (file)
 #error C API is no longer supported
 #endif
 
+/** An index that may optionally be placed in shared memory.
+ *  Must be Plain Old Data (POD), so no virtual methods are allowed.
+ *  If in shared memory, exactly one process must explicitly call the constructor via placement new.
+ */
+struct audio_utils_fifo_index {
+    friend class audio_utils_fifo_reader;
+    friend class audio_utils_fifo_writer;
+
+public:
+    audio_utils_fifo_index() : mIndex(0) { }
+
+private:
+    // Linux futex is 32 bits regardless of platform.
+    // It would make more sense to declare this as atomic_uint32_t, but there is no such type name.
+    std::atomic_uint_least32_t  mIndex; // accessed by both sides using atomic operations
+    static_assert(sizeof(mIndex) == sizeof(uint32_t), "mIndex must be 32 bits");
+
+    // TODO Abstract out atomic operations to here
+    // TODO Replace friend by setter and getter, and abstract the futex
+};
+
 // Base class for single writer, single-reader or multi-reader non-blocking FIFO.
 // The base class manipulates frame indices only, and has no knowledge of frame sizes or the buffer.
 
 class audio_utils_fifo_base {
 
 protected:
-    audio_utils_fifo_base(uint32_t frameCount);
+
+/* Construct FIFO base class
+ *
+ *  \param sharedRear  Writer's rear index in shared memory.
+ *  \param throttleFront Pointer to the front index of at most one reader that throttles the
+ *                       writer, or NULL for no throttling.
+ */
+    audio_utils_fifo_base(uint32_t frameCount, audio_utils_fifo_index& sharedRear,
+            // TODO inconsistent & vs *
+            audio_utils_fifo_index *throttleFront = NULL);
     /*virtual*/ ~audio_utils_fifo_base();
 
 /** Return a new index as the sum of a validated index and a specified increment.
@@ -62,11 +92,11 @@ protected:
     // TODO always true for now, will be extended later to support false
     const bool mIsPrivate;        // whether reader and writer virtual address spaces are the same
 
-    std::atomic_uint_fast32_t mSharedRear;  // accessed by both sides using atomic operations
+    audio_utils_fifo_index&     mSharedRear;
 
-    // Pointer to the mSharedFront of at most one reader that throttles the writer,
+    // Pointer to the front index of at most one reader that throttles the writer,
     // or NULL for no throttling
-    std::atomic_uint_fast32_t *mThrottleFront;
+    audio_utils_fifo_index*     mThrottleFront;
 };
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -82,23 +112,41 @@ class audio_utils_fifo : audio_utils_fifo_base {
 public:
 
 /**
- * Construct a FIFO object.
+ * Construct a FIFO object: multi-process.
  *
  *  \param frameCount  Max number of significant frames to be stored in the FIFO > 0.
  *                     If writes and reads always use the same count, and that count is a divisor of
  *                     frameCount, then the writes and reads will never do a partial transfer.
  *  \param frameSize   Size of each frame in bytes > 0, and frameSize * frameCount <= INT_MAX.
  *  \param buffer      Pointer to a caller-allocated buffer of frameCount frames.
+ *  \param sharedRear  Writer's rear index in shared memory.
+ *  \param throttleFront Pointer to the front index of at most one reader that throttles the
+ *                       writer, or NULL for no throttling.
  */
-    audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer);
+    audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer,
+            // TODO inconsistent & vs *
+            audio_utils_fifo_index& sharedRear, audio_utils_fifo_index *throttleFront = NULL);
+
+/**
+ * Construct a FIFO object: single-process.
+ *  \param throttlesWriter Whether there is a reader that throttles the writer.
+ */
+    audio_utils_fifo(uint32_t frameCount, uint32_t frameSize, void *buffer,
+            bool throttlesWriter = false);
 
     /*virtual*/ ~audio_utils_fifo();
 
 private:
 
     // These fields are const after initialization
-    const uint32_t mFrameSize;    // size of each frame in bytes
-    void * const   mBuffer;       // pointer to caller-allocated buffer of size mFrameCount frames
+    const uint32_t mFrameSize;  // size of each frame in bytes
+    void * const   mBuffer;     // pointer to caller-allocated buffer of size mFrameCount frames
+
+    // only used for single-process constructor
+    audio_utils_fifo_index      mSingleProcessSharedRear;
+
+    // only used for single-process constructor when throttlesWriter == true
+    audio_utils_fifo_index      mSingleProcessSharedFront;
 };
 
 // Describes one virtually contiguous fragment of a logically contiguous slice.
@@ -121,6 +169,7 @@ public:
 
 // The timeout indicates the maximum time to wait for at least one frame, not for all frames.
 // NULL is equivalent to non-blocking.
+// FIXME specify timebase, relative/absolute etc
 
 // Error codes for ssize_t return value:
 //  -EIO        corrupted indices (reader or writer)
@@ -139,6 +188,7 @@ protected:
 class audio_utils_fifo_writer : public audio_utils_fifo_provider {
 
 public:
+    // Single-process and multi-process use same constructor here, but different 'fifo' constructors
     audio_utils_fifo_writer(audio_utils_fifo& fifo);
     virtual ~audio_utils_fifo_writer();
 
@@ -171,6 +221,7 @@ private:
     // Accessed by writer only using ordinary operations
     uint32_t    mLocalRear; // frame index of next frame slot available to write, or write index
 
+    // TODO needs a state transition diagram for threshold and arming process
     uint32_t    mLowLevelArm;       // arm if filled <= threshold
     uint32_t    mHighLevelTrigger;  // trigger reader if armed and filled >= threshold
     bool        mArmed;
@@ -183,7 +234,8 @@ private:
 class audio_utils_fifo_reader : public audio_utils_fifo_provider {
 
 public:
-    audio_utils_fifo_reader(audio_utils_fifo& fifo, bool throttlesWriter);
+    // At most one reader can specify throttlesWriter == true
+    audio_utils_fifo_reader(audio_utils_fifo& fifo, bool throttlesWriter = true);
     virtual ~audio_utils_fifo_reader();
 
 /** Read from FIFO.
@@ -215,12 +267,14 @@ private:
     // Accessed by reader only using ordinary operations
     uint32_t     mLocalFront;   // frame index of first frame slot available to read, or read index
 
-    // Accessed by a throttling reader and writer using atomic operations
-    std::atomic_uint_fast32_t mSharedFront;
+    // Points to shared front index if this reader throttles writer, or NULL if we don't throttle
+    audio_utils_fifo_index*     mThrottleFront;
 
+    // TODO not used yet
     uint32_t    mHighLevelArm;      // arm if filled >= threshold
     uint32_t    mLowLevelTrigger;   // trigger writer if armed and filled <= threshold
     bool        mArmed;
+
 };
 
 #endif  // !ANDROID_AUDIO_FIFO_H