From 3ddacbbcd6c119d3c47a50a9291fdff91fb98386 Mon Sep 17 00:00:00 2001 From: Glenn Kasten Date: Mon, 20 Jun 2016 14:41:03 -0700 Subject: [PATCH] Add blocking with optional timeout, and effective buffer size Change-Id: I15d133bcbe257e1ee767e9dd155e2b646019d59e --- audio_utils/fifo.cpp | 155 +++++++++++++++++++++++++++------ audio_utils/include/audio_utils/fifo.h | 38 ++++++-- audio_utils/tests/Android.mk | 9 ++ audio_utils/tests/fifo_threads.cpp | 141 ++++++++++++++++++++++++++++++ audio_utils/tests/getch.c | 1 + audio_utils/tests/getch.h | 1 + 6 files changed, 314 insertions(+), 31 deletions(-) create mode 100644 audio_utils/tests/fifo_threads.cpp create mode 120000 audio_utils/tests/getch.c create mode 120000 audio_utils/tests/getch.h diff --git a/audio_utils/fifo.cpp b/audio_utils/fifo.cpp index 31c614d7..8273309b 100644 --- a/audio_utils/fifo.cpp +++ b/audio_utils/fifo.cpp @@ -18,17 +18,26 @@ #define LOG_TAG "audio_utils_fifo" #include +#include #include #include +#include + #include #include #include #include +static int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) +{ + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + audio_utils_fifo_base::audio_utils_fifo_base(uint32_t frameCount) __attribute__((no_sanitize("integer"))) : mFrameCount(frameCount), mFrameCountP2(roundup(frameCount)), mFudgeFactor(mFrameCountP2 - mFrameCount), + mIsPrivate(true), mSharedRear(0), mThrottleFront(NULL) { // actual upper bound on frameCount will depend on the frame size @@ -120,7 +129,9 @@ audio_utils_fifo_provider::~audio_utils_fifo_provider() //////////////////////////////////////////////////////////////////////////////// audio_utils_fifo_writer::audio_utils_fifo_writer(audio_utils_fifo& fifo) : - audio_utils_fifo_provider(), mFifo(fifo), mLocalRear(0) + audio_utils_fifo_provider(), mFifo(fifo), mLocalRear(0), + mLowLevelArm(fifo.mFrameCount), mHighLevelTrigger(0), mArmed(false), + mEffectiveFrames(fifo.mFrameCount) { } @@ -128,11 +139,11 @@ audio_utils_fifo_writer::~audio_utils_fifo_writer() { } -ssize_t audio_utils_fifo_writer::write(const void *buffer, size_t count) +ssize_t audio_utils_fifo_writer::write(const void *buffer, size_t count, struct timespec *timeout) __attribute__((no_sanitize("integer"))) { audio_utils_iovec iovec[2]; - ssize_t availToWrite = obtain(iovec, count); + ssize_t availToWrite = obtain(iovec, count, timeout); if (availToWrite > 0) { memcpy((char *) mFifo.mBuffer + iovec[0].mOffset * mFifo.mFrameSize, buffer, iovec[0].mLength * mFifo.mFrameSize); @@ -146,21 +157,45 @@ ssize_t audio_utils_fifo_writer::write(const void *buffer, size_t count) return availToWrite; } -ssize_t audio_utils_fifo_writer::obtain(audio_utils_iovec iovec[2], size_t count) +ssize_t audio_utils_fifo_writer::obtain(audio_utils_iovec iovec[2], size_t count, + struct timespec *timeout) __attribute__((no_sanitize("integer"))) { size_t availToWrite; if (mFifo.mThrottleFront != NULL) { - uint32_t front = (uint32_t) atomic_load_explicit(mFifo.mThrottleFront, - std::memory_order_acquire); - int32_t filled = mFifo.diff(mLocalRear, front, NULL /*lost*/); - if (filled < 0) { - mObtained = 0; - return (ssize_t) filled; + uint32_t front; + for (;;) { + front = (uint32_t) atomic_load_explicit(mFifo.mThrottleFront, + std::memory_order_acquire); + int32_t filled = mFifo.diff(mLocalRear, front, NULL /*lost*/); + if (filled < 0) { + mObtained = 0; + return (ssize_t) filled; + } + availToWrite = mEffectiveFrames > (uint32_t) filled ? + mEffectiveFrames - (uint32_t) filled : 0; + // TODO pull out "count == 0" + if (count == 0 || availToWrite > 0 || timeout == NULL || + (timeout->tv_sec == 0 && timeout->tv_nsec == 0)) { + break; + } + int err = sys_futex(mFifo.mThrottleFront, + mFifo.mIsPrivate ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT, front, timeout, NULL, 0); + if (err < 0) { + switch (errno) { + case EWOULDBLOCK: + case EINTR: + case ETIMEDOUT: + break; + default: + LOG_ALWAYS_FATAL("unexpected err=%d errno=%d", err, errno); + break; + } + } + timeout = NULL; } - availToWrite = (size_t) mFifo.mFrameCount - (size_t) filled; } else { - availToWrite = mFifo.mFrameCount; + availToWrite = mEffectiveFrames; } if (availToWrite > count) { availToWrite = count; @@ -184,9 +219,33 @@ void audio_utils_fifo_writer::release(size_t count) { if (count > 0) { LOG_ALWAYS_FATAL_IF(count > mObtained); - mLocalRear = mFifo.sum(mLocalRear, count); - atomic_store_explicit(&mFifo.mSharedRear, (uint_fast32_t) mLocalRear, - std::memory_order_release); + if (mFifo.mThrottleFront != NULL) { + uint32_t front = (uint32_t) atomic_load_explicit(mFifo.mThrottleFront, + std::memory_order_acquire); + int32_t filled = mFifo.diff(mLocalRear, front, NULL /*lost*/); + mLocalRear = mFifo.sum(mLocalRear, count); + atomic_store_explicit(&mFifo.mSharedRear, (uint_fast32_t) mLocalRear, + std::memory_order_release); + if (filled >= 0) { + if (filled + count <= mLowLevelArm) { + mArmed = true; + } + if (mArmed && filled + count >= mHighLevelTrigger) { + int err = sys_futex(&mFifo.mSharedRear, + mFifo.mIsPrivate ? FUTEX_WAKE_PRIVATE : FUTEX_WAKE, + INT_MAX /*waiters*/, NULL, NULL, 0); + // err is number of processes woken up + if (err < 0) { + LOG_ALWAYS_FATAL("%s: unexpected err=%d errno=%d", __func__, err, errno); + } + mArmed = false; + } + } + } else { + mLocalRear = mFifo.sum(mLocalRear, count); + atomic_store_explicit(&mFifo.mSharedRear, (uint_fast32_t) mLocalRear, + std::memory_order_release); + } mObtained -= count; } } @@ -194,7 +253,8 @@ void audio_utils_fifo_writer::release(size_t count) //////////////////////////////////////////////////////////////////////////////// audio_utils_fifo_reader::audio_utils_fifo_reader(audio_utils_fifo& fifo, bool throttlesWriter) : - audio_utils_fifo_provider(), mFifo(fifo), mLocalFront(0), mSharedFront(0) + audio_utils_fifo_provider(), mFifo(fifo), mLocalFront(0), mSharedFront(0), + mHighLevelArm(0), mLowLevelTrigger(mFifo.mFrameCount), mArmed(false) { if (throttlesWriter) { LOG_ALWAYS_FATAL_IF(fifo.mThrottleFront != NULL); @@ -210,11 +270,12 @@ audio_utils_fifo_reader::~audio_utils_fifo_reader() } } -ssize_t audio_utils_fifo_reader::read(void *buffer, size_t count, size_t *lost) +ssize_t audio_utils_fifo_reader::read(void *buffer, size_t count, struct timespec *timeout, + size_t *lost) __attribute__((no_sanitize("integer"))) { audio_utils_iovec iovec[2]; - ssize_t availToRead = obtain(iovec, count, lost); + ssize_t availToRead = obtain(iovec, count, timeout, lost); if (availToRead > 0) { memcpy(buffer, (char *) mFifo.mBuffer + iovec[0].mOffset * mFifo.mFrameSize, iovec[0].mLength * mFifo.mFrameSize); @@ -228,10 +289,11 @@ ssize_t audio_utils_fifo_reader::read(void *buffer, size_t count, size_t *lost) return availToRead; } -ssize_t audio_utils_fifo_reader::obtain(audio_utils_iovec iovec[2], size_t count) +ssize_t audio_utils_fifo_reader::obtain(audio_utils_iovec iovec[2], size_t count, + struct timespec *timeout) __attribute__((no_sanitize("integer"))) { - return obtain(iovec, count, NULL); + return obtain(iovec, count, timeout, NULL); } void audio_utils_fifo_reader::release(size_t count) @@ -239,23 +301,66 @@ void audio_utils_fifo_reader::release(size_t count) { if (count > 0) { LOG_ALWAYS_FATAL_IF(count > mObtained); - mLocalFront = mFifo.sum(mLocalFront, count); if (mFifo.mThrottleFront == &mSharedFront) { + uint32_t rear = (uint32_t) atomic_load_explicit(&mFifo.mSharedRear, + std::memory_order_acquire); + int32_t filled = mFifo.diff(rear, mLocalFront, NULL /*lost*/); + mLocalFront = mFifo.sum(mLocalFront, count); atomic_store_explicit(&mSharedFront, (uint_fast32_t) mLocalFront, std::memory_order_release); + if (filled >= 0) { + if (filled - count >= mHighLevelArm) { + mArmed = true; + } + if (mArmed && filled - count <= mLowLevelTrigger) { + int err = sys_futex(&mFifo.mSharedRear, + mFifo.mIsPrivate ? FUTEX_WAKE_PRIVATE : FUTEX_WAKE, + 1 /*waiters*/, NULL, NULL, 0); + // err is number of processes woken up + if (err < 0 || err > 1) { + LOG_ALWAYS_FATAL("%s: unexpected err=%d errno=%d", __func__, err, errno); + } + mArmed = false; + } + } + } else { + mLocalFront = mFifo.sum(mLocalFront, count); } mObtained -= count; } } -ssize_t audio_utils_fifo_reader::obtain(audio_utils_iovec iovec[2], size_t count, size_t *lost) +ssize_t audio_utils_fifo_reader::obtain(audio_utils_iovec iovec[2], size_t count, + struct timespec *timeout, size_t *lost) __attribute__((no_sanitize("integer"))) { - uint32_t rear = (uint32_t) atomic_load_explicit(&mFifo.mSharedRear, - std::memory_order_acquire); + uint32_t rear; + for (;;) { + rear = (uint32_t) atomic_load_explicit(&mFifo.mSharedRear, + std::memory_order_acquire); + // TODO pull out "count == 0" + if (count == 0 || rear != mLocalFront || timeout == NULL || + (timeout->tv_sec == 0 && timeout->tv_nsec == 0)) { + break; + } + int err = sys_futex(&mFifo.mSharedRear, mFifo.mIsPrivate ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT, + rear, timeout, NULL, 0); + if (err < 0) { + switch (errno) { + case EWOULDBLOCK: + case EINTR: + case ETIMEDOUT: + break; + default: + LOG_ALWAYS_FATAL("unexpected err=%d errno=%d", err, errno); + break; + } + } + timeout = NULL; + } int32_t filled = mFifo.diff(rear, mLocalFront, lost); if (filled < 0) { - if (filled == android::BAD_INDEX) { + if (filled == -EOVERFLOW) { mLocalFront = rear; } mObtained = 0; diff --git a/audio_utils/include/audio_utils/fifo.h b/audio_utils/include/audio_utils/fifo.h index 1688dea7..c896b626 100644 --- a/audio_utils/include/audio_utils/fifo.h +++ b/audio_utils/include/audio_utils/fifo.h @@ -59,6 +59,9 @@ protected: // after the end of mBuffer. Only the indices are wasted, not any // memory. + // TODO always true for now, will be extended later to support false + const bool mIsPrivate; // whether reader and writer virtual address spaces are the same + std::atomic_uint_fast32_t mSharedRear; // accessed by both sides using atomic operations // Pointer to the mSharedFront of at most one reader that throttles the writer, @@ -113,10 +116,16 @@ public: audio_utils_fifo_provider(); virtual ~audio_utils_fifo_provider(); +// The count is the maximum number of desired frames, not the minimum number of desired frames. +// See the high/low setpoints for something which is close to, but not the same as, a true minimum. + +// The timeout indicates the maximum time to wait for at least one frame, not for all frames. +// NULL is equivalent to non-blocking. + // Error codes for ssize_t return value: // -EIO corrupted indices (reader or writer) // -EOVERFLOW reader is not keeping up with writer (reader only) - virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count) = 0; + virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count, struct timespec *timeout) = 0; virtual void release(size_t count) = 0; @@ -138,6 +147,7 @@ public: * * \param buffer Pointer to source buffer containing 'count' frames of data. * \param count Desired number of frames to write. + * \param timeout NULL and zero fields are both non-blocking. * * \return actual number of frames written <= count. * @@ -145,17 +155,27 @@ public: * or partial if the FIFO was almost full. * A negative return value indicates an error. */ - ssize_t write(const void *buffer, size_t count); + ssize_t write(const void *buffer, size_t count, struct timespec *timeout = NULL); // Implement audio_utils_fifo_provider - virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count); + virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count, struct timespec *timeout); virtual void release(size_t count); + // TODO add error checks and getters + void setHighLevelTrigger(uint32_t level) { mHighLevelTrigger = level; } + void setEffectiveFrames(uint32_t effectiveFrames) { mEffectiveFrames = effectiveFrames; } + private: audio_utils_fifo& mFifo; // Accessed by writer only using ordinary operations uint32_t mLocalRear; // frame index of next frame slot available to write, or write index + + uint32_t mLowLevelArm; // arm if filled <= threshold + uint32_t mHighLevelTrigger; // trigger reader if armed and filled >= threshold + bool mArmed; + + uint32_t mEffectiveFrames; // current effective buffer size, <= mFifo.mFrameCount }; //////////////////////////////////////////////////////////////////////////////// @@ -170,6 +190,7 @@ public: * * \param buffer Pointer to destination buffer to be filled with up to 'count' frames of data. * \param count Desired number of frames to read. + * \param timeout NULL and zero fields are both non-blocking. * \param lost If non-NULL, set to the approximate number of lost frames before re-sync. * * \return actual number of frames read <= count. @@ -178,14 +199,15 @@ public: * or partial if the FIFO was almost empty. * A negative return value indicates an error. */ - ssize_t read(void *buffer, size_t count, size_t *lost = NULL); + ssize_t read(void *buffer, size_t count, struct timespec *timeout = NULL, size_t *lost = NULL); // Implement audio_utils_fifo_provider - virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count); + virtual ssize_t obtain(audio_utils_iovec iovec[2], size_t count, struct timespec *timeout); virtual void release(size_t count); // Extended parameter list for reader only - ssize_t obtain(audio_utils_iovec iovec[2], size_t count, size_t *lost); + ssize_t obtain(audio_utils_iovec iovec[2], size_t count, struct timespec *timeout, + size_t *lost); private: audio_utils_fifo& mFifo; @@ -195,6 +217,10 @@ private: // Accessed by a throttling reader and writer using atomic operations std::atomic_uint_fast32_t mSharedFront; + + uint32_t mHighLevelArm; // arm if filled >= threshold + uint32_t mLowLevelTrigger; // trigger writer if armed and filled <= threshold + bool mArmed; }; #endif // !ANDROID_AUDIO_FIFO_H diff --git a/audio_utils/tests/Android.mk b/audio_utils/tests/Android.mk index c0859976..bbad5777 100644 --- a/audio_utils/tests/Android.mk +++ b/audio_utils/tests/Android.mk @@ -40,6 +40,15 @@ LOCAL_CFLAGS := -Werror -Wall include $(BUILD_HOST_EXECUTABLE) include $(CLEAR_VARS) +# TODO move getch.c and .h to a utility library +LOCAL_SRC_FILES := fifo_threads.cpp getch.c +LOCAL_MODULE := fifo_threads +LOCAL_C_INCLUDES := $(call include-path-for, audio-utils) +LOCAL_STATIC_LIBRARIES := libaudioutils liblog +LOCAL_CFLAGS := -Werror -Wall +include $(BUILD_HOST_EXECUTABLE) + +include $(CLEAR_VARS) LOCAL_SRC_FILES := limiter_tests.c LOCAL_MODULE := limiter_tests LOCAL_C_INCLUDES := $(call include-path-for, audio-utils) diff --git a/audio_utils/tests/fifo_threads.cpp b/audio_utils/tests/fifo_threads.cpp new file mode 100644 index 00000000..92a2eae7 --- /dev/null +++ b/audio_utils/tests/fifo_threads.cpp @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2016 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +extern "C" { +#include "getch.h" +} + +struct Context { + audio_utils_fifo_writer *mInputWriter; + audio_utils_fifo_reader *mInputReader; + audio_utils_fifo_writer *mTransferWriter; + audio_utils_fifo_reader *mTransferReader; + audio_utils_fifo_writer *mOutputWriter; + audio_utils_fifo_reader *mOutputReader; +}; + +void *input_routine(void *arg) +{ + Context *context = (Context *) arg; + for (;;) { + struct timespec timeout; + timeout.tv_sec = 3; + timeout.tv_nsec = 0; + char buffer[4]; + ssize_t actual = context->mInputReader->read(buffer, sizeof(buffer), &timeout); + if (actual == 0) { + (void) write(1, "t", 1); + } else if (actual > 0) { + actual = context->mTransferWriter->write(buffer, actual, &timeout); + //printf("transfer.write actual = %d\n", (int) actual); + } else { + printf("input.read actual = %d\n", (int) actual); + } + } + return NULL; +} + +void *output_routine(void *arg) +{ + Context *context = (Context *) arg; + for (;;) { + struct timespec timeout; + timeout.tv_sec = 5; + timeout.tv_nsec = 0; + char buffer[4]; + ssize_t actual = context->mTransferReader->read(buffer, sizeof(buffer), &timeout); + if (actual == 0) { + (void) write(1, "T", 1); + } else if (actual > 0) { + actual = context->mOutputWriter->write(buffer, actual, &timeout); + //printf("output.write actual = %d\n", (int) actual); + } else { + printf("transfer.read actual = %d\n", (int) actual); + } + } + return NULL; +} + +int main(int argc, char **argv) +{ + set_conio_terminal_mode(); + argc = argc + 0; + argv = &argv[0]; + + char inputBuffer[64]; + audio_utils_fifo inputFifo(sizeof(inputBuffer) /*frameCount*/, 1 /*frameSize*/, inputBuffer); + audio_utils_fifo_writer inputWriter(inputFifo); + audio_utils_fifo_reader inputReader(inputFifo, true /*readerThrottlesWriter*/); + inputWriter.setHighLevelTrigger(3); + + char transferBuffer[64]; + audio_utils_fifo transferFifo(sizeof(transferBuffer) /*frameCount*/, 1 /*frameSize*/, + transferBuffer); + audio_utils_fifo_writer transferWriter(transferFifo); + audio_utils_fifo_reader transferReader(transferFifo, true /*readerThrottlesWriter*/); + transferWriter.setEffectiveFrames(2); + + char outputBuffer[64]; + audio_utils_fifo outputFifo(sizeof(outputBuffer) /*frameCount*/, 1 /*frameSize*/, outputBuffer); + audio_utils_fifo_writer outputWriter(outputFifo); + audio_utils_fifo_reader outputReader(outputFifo, true /*readerThrottlesWriter*/); + + Context context; + context.mInputWriter = &inputWriter; + context.mInputReader = &inputReader; + context.mTransferWriter = &transferWriter; + context.mTransferReader = &transferReader; + context.mOutputWriter = &outputWriter; + context.mOutputReader = &outputReader; + + pthread_t input_thread; + int ok = pthread_create(&input_thread, (const pthread_attr_t *) NULL, input_routine, + (void *) &context); + pthread_t output_thread; + ok = pthread_create(&output_thread, (const pthread_attr_t *) NULL, output_routine, + (void *) &context); + ok = ok + 0; + + for (;;) { + char buffer[1]; + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + ssize_t actual = outputReader.read(buffer, sizeof(buffer), &timeout); + if (actual == 1) { + printf("%c", buffer[0]); + fflush(stdout); + } else if (actual != 0) { + printf("outputReader.read actual = %d\n", (int) actual); + } + if (kbhit()) { + int ch = getch(); + if (ch <= 0 || ch == 3) { + break; + } + buffer[0] = ch; + actual = inputWriter.write(buffer, sizeof(buffer), &timeout); + if (actual != 1) { + printf("inputWriter.write actual = %d\n", (int) actual); + } + } + } + reset_terminal_mode(); +} diff --git a/audio_utils/tests/getch.c b/audio_utils/tests/getch.c new file mode 120000 index 00000000..472d85b2 --- /dev/null +++ b/audio_utils/tests/getch.c @@ -0,0 +1 @@ +../../../../frameworks/wilhelm/tests/sandbox/getch.c \ No newline at end of file diff --git a/audio_utils/tests/getch.h b/audio_utils/tests/getch.h new file mode 120000 index 00000000..094c1b04 --- /dev/null +++ b/audio_utils/tests/getch.h @@ -0,0 +1 @@ +../../../../frameworks/wilhelm/tests/sandbox/getch.h \ No newline at end of file -- 2.11.0