--- /dev/null
+/******************************************************************************
+ *
+ * Copyright (C) 2014 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+#pragma once
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "allocator.h"
+
+typedef struct eager_reader_t eager_reader_t;
+typedef struct reactor_t reactor_t;
+
+typedef void (*eager_reader_cb)(eager_reader_t *reader, void *context);
+
+// Creates a new eager reader object, which pulls data from |fd_to_read| into
+// buffers of size |buffer_size| allocated using |allocator|, and has an
+// internal read thread named |thread_name|. The returned object must be freed using
+// |eager_reader_free|. |fd_to_read| must be valid, |buffer_size| and |max_buffer_count|
+// must be greater than zero. |allocator| and |thread_name| may not be NULL.
+eager_reader_t *eager_reader_new(
+ int fd_to_read,
+ const allocator_t *allocator,
+ size_t buffer_size,
+ size_t max_buffer_count,
+ const char *thread_name
+);
+
+// Frees an eager reader object, and associated internal resources.
+// |reader| may be NULL.
+void eager_reader_free(eager_reader_t *reader);
+
+// Registers |reader| with the |reactor|. When the reader has data
+// |read_cb| will be called. The |context| parameter is passed, untouched, to |read_cb|.
+// Neither |reader|, nor |reactor|, nor |read_cb| may be NULL. |context| may be NULL.
+void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context);
+
+// Unregisters |reader| from whichever reactor it is registered with, if any. This
+// function is idempotent.
+void eager_reader_unregister(eager_reader_t *reader);
+
+// Returns the next byte in the stream, blocks if not available yet.
+// |reader| may not be NULL.
+// NOT SAFE FOR READING FROM MULTIPLE THREADS
+// but you should probably only be reading from one thread anyway,
+// otherwise the byte stream probably doesn't make sense.
+uint8_t eager_reader_read_byte(eager_reader_t *reader);
+
+// Returns true if there is at least one byte to read in the stream.
+// |reader| may not be NULL.
+bool eager_reader_has_byte(eager_reader_t *reader);
--- /dev/null
+/******************************************************************************
+ *
+ * Copyright (C) 2014 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+#define LOG_TAG "osi_eager_reader"
+
+#include <assert.h>
+#include <errno.h>
+#include <stddef.h>
+#include <sys/eventfd.h>
+#include <utils/Log.h>
+
+#include "eager_reader.h"
+#include "fixed_queue.h"
+#include "osi.h"
+#include "reactor.h"
+#include "thread.h"
+
+#if !defined(EFD_SEMAPHORE)
+# define EFD_SEMAPHORE (1 << 0)
+#endif
+
+typedef struct {
+ size_t length;
+ size_t offset;
+ uint8_t data[];
+} data_buffer_t;
+
+struct eager_reader_t {
+ int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
+ int inbound_fd;
+
+ const allocator_t *allocator;
+ size_t buffer_size;
+ fixed_queue_t *buffers;
+ data_buffer_t *current_buffer;
+
+ thread_t *inbound_read_thread;
+ reactor_object_t *inbound_read_object;
+
+ reactor_object_t *outbound_registration;
+ eager_reader_cb outbound_read_ready;
+ void *outbound_context;
+};
+
+static void inbound_data_waiting(void *context);
+static void internal_outbound_read_ready(void *context);
+
+eager_reader_t *eager_reader_new(
+ int fd_to_read,
+ const allocator_t *allocator,
+ size_t buffer_size,
+ size_t max_buffer_count,
+ const char *thread_name) {
+
+ assert(fd_to_read != INVALID_FD);
+ assert(allocator != NULL);
+ assert(buffer_size > 0);
+ assert(max_buffer_count > 0);
+ assert(thread_name != NULL && *thread_name != '\0');
+
+ eager_reader_t *ret = calloc(1, sizeof(eager_reader_t));
+ if (!ret) {
+ ALOGE("%s unable to allocate memory for new eager_reader.", __func__);
+ goto error;
+ }
+
+ ret->allocator = allocator;
+ ret->inbound_fd = fd_to_read;
+
+ ret->bytes_available_fd = eventfd(0, EFD_SEMAPHORE);
+ if (ret->bytes_available_fd == INVALID_FD) {
+ ALOGE("%s unable to create output reading semaphore.", __func__);
+ goto error;
+ }
+
+ ret->buffer_size = buffer_size;
+
+ ret->buffers = fixed_queue_new(max_buffer_count);
+ if (!ret->buffers) {
+ ALOGE("%s unable to create buffers queue.", __func__);
+ goto error;
+ }
+
+ ret->inbound_read_thread = thread_new(thread_name);
+ if (!ret->inbound_read_thread) {
+ ALOGE("%s unable to make reading thread.", __func__);
+ goto error;
+ }
+
+ ret->inbound_read_object = reactor_register(
+ thread_get_reactor(ret->inbound_read_thread),
+ fd_to_read,
+ ret,
+ inbound_data_waiting,
+ NULL
+ );
+
+ return ret;
+
+error:;
+ eager_reader_free(ret);
+ return NULL;
+}
+
+void eager_reader_free(eager_reader_t *reader) {
+ if (!reader)
+ return;
+
+ eager_reader_unregister(reader);
+
+ // Only unregister from the input if we actually did register
+ if (reader->inbound_read_object)
+ reactor_unregister(reader->inbound_read_object);
+
+ if (reader->bytes_available_fd != INVALID_FD)
+ close(reader->bytes_available_fd);
+
+ // Free the current buffer, because it's not in the queue
+ // and won't be freed below
+ if (reader->current_buffer)
+ reader->allocator->free(reader->current_buffer);
+
+ fixed_queue_free(reader->buffers, reader->allocator->free);
+ thread_free(reader->inbound_read_thread);
+ free(reader);
+}
+
+void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
+ assert(reader != NULL);
+ assert(reactor != NULL);
+ assert(read_cb != NULL);
+
+ // Make sure the reader isn't currently registered.
+ eager_reader_unregister(reader);
+
+ reader->outbound_read_ready = read_cb;
+ reader->outbound_context = context;
+ reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
+}
+
+void eager_reader_unregister(eager_reader_t *reader) {
+ assert(reader != NULL);
+
+ if (reader->outbound_registration) {
+ reactor_unregister(reader->outbound_registration);
+ reader->outbound_registration = NULL;
+ }
+}
+
+// SEE HEADER FOR THREAD SAFETY NOTE
+uint8_t eager_reader_read_byte(eager_reader_t *reader) {
+ assert(reader != NULL);
+
+ eventfd_t value;
+ if (eventfd_read(reader->bytes_available_fd, &value) == -1)
+ ALOGE("%s unable to read semaphore for output data.", __func__);
+
+ if (!reader->current_buffer)
+ reader->current_buffer = fixed_queue_dequeue(reader->buffers);
+
+ uint8_t byte = reader->current_buffer->data[reader->current_buffer->offset];
+ reader->current_buffer->offset++;
+
+ // Prep for next byte request
+ if (reader->current_buffer->offset >= reader->current_buffer->length) {
+ reader->allocator->free(reader->current_buffer);
+ reader->current_buffer = NULL;
+ }
+
+ return byte;
+}
+
+bool eager_reader_has_byte(eager_reader_t *reader) {
+ assert(reader != NULL);
+
+ fd_set read_fds;
+ FD_ZERO(&read_fds);
+ FD_SET(reader->bytes_available_fd, &read_fds);
+
+ // Immediate timeout
+ struct timeval timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+
+ select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
+ return FD_ISSET(reader->bytes_available_fd, &read_fds);
+}
+
+static void inbound_data_waiting(void *context) {
+ eager_reader_t *reader = (eager_reader_t *)context;
+
+ data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
+ if (!buffer) {
+ ALOGE("%s couldn't aquire memory for inbound data buffer.", __func__);
+ return;
+ }
+
+ buffer->length = 0;
+ buffer->offset = 0;
+
+ int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
+ if (bytes_read > 0) {
+ // Save the data for later
+ buffer->length = bytes_read;
+ fixed_queue_enqueue(reader->buffers, buffer);
+
+ // Tell consumers data is available by incrementing
+ // the semaphore by the number of bytes we just read
+ eventfd_write(reader->bytes_available_fd, bytes_read);
+ } else {
+ if (bytes_read == 0)
+ ALOGW("%s fd said bytes existed, but none were found.", __func__);
+ else
+ ALOGW("%s unable to read from file descriptor: %s", __func__, strerror(errno));
+
+ reader->allocator->free(buffer);
+ }
+}
+
+static void internal_outbound_read_ready(void *context) {
+ assert(context != NULL);
+
+ eager_reader_t *reader = (eager_reader_t *)context;
+ reader->outbound_read_ready(reader, reader->outbound_context);
+}
--- /dev/null
+/******************************************************************************
+ *
+ * Copyright (C) 2014 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+#include <gtest/gtest.h>
+
+extern "C" {
+#include <stdint.h>
+#include <unistd.h>
+#include <utils/Log.h>
+
+#include "allocator.h"
+#include "eager_reader.h"
+#include "osi.h"
+#include "semaphore.h"
+#include "thread.h"
+}
+
+#define BUFFER_SIZE 32
+
+static char *small_data = (char *)"white chocolate lindor truffles";
+
+static semaphore_t *done;
+
+class EagerReaderTest : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ pipe(pipefd);
+ done = semaphore_new(0);
+ }
+
+ virtual void TearDown() {
+ semaphore_free(done);
+ }
+
+ int pipefd[2];
+};
+
+static void expect_data(eager_reader_t *reader, void *context) {
+ EXPECT_TRUE(eager_reader_has_byte(reader)) << "if we got a callback we expect there to be data";
+
+ char *data = (char *)context;
+ int length = strlen(data);
+ int i;
+
+ for (i = 0; i < length; i++) {
+ EXPECT_EQ(data[i], eager_reader_read_byte(reader));
+ }
+
+ semaphore_post(done);
+}
+
+TEST_F(EagerReaderTest, test_new_simple) {
+ eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+ ASSERT_TRUE(reader != NULL);
+}
+
+TEST_F(EagerReaderTest, test_free_simple) {
+ eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+ eager_reader_free(reader);
+}
+
+TEST_F(EagerReaderTest, test_small_data) {
+ eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+
+ thread_t *read_thread = thread_new("read_thread");
+ eager_reader_register(reader, thread_get_reactor(read_thread), expect_data, small_data);
+
+ write(pipefd[1], small_data, strlen(small_data));
+
+ semaphore_wait(done);
+ eager_reader_free(reader);
+ thread_free(read_thread);
+}