OSDN Git Service

Add eager reader
authorZach Johnson <zachoverflow@google.com>
Tue, 5 Aug 2014 03:51:06 +0000 (20:51 -0700)
committerAndre Eisenbach <eisenbach@google.com>
Mon, 16 Mar 2015 23:51:29 +0000 (16:51 -0700)
osi/Android.mk
osi/include/eager_reader.h [new file with mode: 0644]
osi/src/eager_reader.c [new file with mode: 0644]
osi/test/eager_reader_test.cpp [new file with mode: 0644]

index 55da672..3a36593 100644 (file)
@@ -28,6 +28,7 @@ LOCAL_SRC_FILES := \
     ./src/allocator.c \
     ./src/config.c \
     ./src/data_dispatcher.c \
+    ./src/eager_reader.c \
     ./src/fixed_queue.c \
     ./src/hash_map.c \
     ./src/list.c \
@@ -57,6 +58,7 @@ LOCAL_SRC_FILES := \
     ./test/atomic_test.cpp \
     ./test/config_test.cpp \
     ./test/data_dispatcher_test.cpp \
+    ./test/eager_reader_test.cpp \
     ./test/list_test.cpp \
     ./test/reactor_test.cpp \
     ./test/thread_test.cpp
diff --git a/osi/include/eager_reader.h b/osi/include/eager_reader.h
new file mode 100644 (file)
index 0000000..a99e120
--- /dev/null
@@ -0,0 +1,67 @@
+/******************************************************************************
+ *
+ *  Copyright (C) 2014 Google, Inc.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at:
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ ******************************************************************************/
+
+#pragma once
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "allocator.h"
+
+typedef struct eager_reader_t eager_reader_t;
+typedef struct reactor_t reactor_t;
+
+typedef void (*eager_reader_cb)(eager_reader_t *reader, void *context);
+
+// Creates a new eager reader object, which pulls data from |fd_to_read| into
+// buffers of size |buffer_size| allocated using |allocator|, and has an
+// internal read thread named |thread_name|. The returned object must be freed using
+// |eager_reader_free|. |fd_to_read| must be valid, |buffer_size| and |max_buffer_count|
+// must be greater than zero. |allocator| and |thread_name| may not be NULL.
+eager_reader_t *eager_reader_new(
+  int fd_to_read,
+  const allocator_t *allocator,
+  size_t buffer_size,
+  size_t max_buffer_count,
+  const char *thread_name
+);
+
+// Frees an eager reader object, and associated internal resources.
+// |reader| may be NULL.
+void eager_reader_free(eager_reader_t *reader);
+
+// Registers |reader| with the |reactor|. When the reader has data
+// |read_cb| will be called. The |context| parameter is passed, untouched, to |read_cb|.
+// Neither |reader|, nor |reactor|, nor |read_cb| may be NULL. |context| may be NULL.
+void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context);
+
+// Unregisters |reader| from whichever reactor it is registered with, if any. This
+// function is idempotent.
+void eager_reader_unregister(eager_reader_t *reader);
+
+// Returns the next byte in the stream, blocks if not available yet.
+// |reader| may not be NULL.
+// NOT SAFE FOR READING FROM MULTIPLE THREADS
+// but you should probably only be reading from one thread anyway,
+// otherwise the byte stream probably doesn't make sense.
+uint8_t eager_reader_read_byte(eager_reader_t *reader);
+
+// Returns true if there is at least one byte to read in the stream.
+// |reader| may not be NULL.
+bool eager_reader_has_byte(eager_reader_t *reader);
diff --git a/osi/src/eager_reader.c b/osi/src/eager_reader.c
new file mode 100644 (file)
index 0000000..8eec872
--- /dev/null
@@ -0,0 +1,240 @@
+/******************************************************************************
+ *
+ *  Copyright (C) 2014 Google, Inc.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at:
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ ******************************************************************************/
+
+#define LOG_TAG "osi_eager_reader"
+
+#include <assert.h>
+#include <errno.h>
+#include <stddef.h>
+#include <sys/eventfd.h>
+#include <utils/Log.h>
+
+#include "eager_reader.h"
+#include "fixed_queue.h"
+#include "osi.h"
+#include "reactor.h"
+#include "thread.h"
+
+#if !defined(EFD_SEMAPHORE)
+#  define EFD_SEMAPHORE (1 << 0)
+#endif
+
+typedef struct {
+  size_t length;
+  size_t offset;
+  uint8_t data[];
+} data_buffer_t;
+
+struct eager_reader_t {
+  int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
+  int inbound_fd;
+
+  const allocator_t *allocator;
+  size_t buffer_size;
+  fixed_queue_t *buffers;
+  data_buffer_t *current_buffer;
+
+  thread_t *inbound_read_thread;
+  reactor_object_t *inbound_read_object;
+
+  reactor_object_t *outbound_registration;
+  eager_reader_cb outbound_read_ready;
+  void *outbound_context;
+};
+
+static void inbound_data_waiting(void *context);
+static void internal_outbound_read_ready(void *context);
+
+eager_reader_t *eager_reader_new(
+    int fd_to_read,
+    const allocator_t *allocator,
+    size_t buffer_size,
+    size_t max_buffer_count,
+    const char *thread_name) {
+
+  assert(fd_to_read != INVALID_FD);
+  assert(allocator != NULL);
+  assert(buffer_size > 0);
+  assert(max_buffer_count > 0);
+  assert(thread_name != NULL && *thread_name != '\0');
+
+  eager_reader_t *ret = calloc(1, sizeof(eager_reader_t));
+  if (!ret) {
+    ALOGE("%s unable to allocate memory for new eager_reader.", __func__);
+    goto error;
+  }
+
+  ret->allocator = allocator;
+  ret->inbound_fd = fd_to_read;
+
+  ret->bytes_available_fd = eventfd(0, EFD_SEMAPHORE);
+  if (ret->bytes_available_fd == INVALID_FD) {
+    ALOGE("%s unable to create output reading semaphore.", __func__);
+    goto error;
+  }
+
+  ret->buffer_size = buffer_size;
+
+  ret->buffers = fixed_queue_new(max_buffer_count);
+  if (!ret->buffers) {
+    ALOGE("%s unable to create buffers queue.", __func__);
+    goto error;
+  }
+
+  ret->inbound_read_thread = thread_new(thread_name);
+  if (!ret->inbound_read_thread) {
+    ALOGE("%s unable to make reading thread.", __func__);
+    goto error;
+  }
+
+  ret->inbound_read_object = reactor_register(
+    thread_get_reactor(ret->inbound_read_thread),
+    fd_to_read,
+    ret,
+    inbound_data_waiting,
+    NULL
+  );
+
+  return ret;
+
+error:;
+  eager_reader_free(ret);
+  return NULL;
+}
+
+void eager_reader_free(eager_reader_t *reader) {
+  if (!reader)
+    return;
+
+  eager_reader_unregister(reader);
+
+  // Only unregister from the input if we actually did register
+  if (reader->inbound_read_object)
+    reactor_unregister(reader->inbound_read_object);
+
+  if (reader->bytes_available_fd != INVALID_FD)
+    close(reader->bytes_available_fd);
+
+  // Free the current buffer, because it's not in the queue
+  // and won't be freed below
+  if (reader->current_buffer)
+    reader->allocator->free(reader->current_buffer);
+
+  fixed_queue_free(reader->buffers, reader->allocator->free);
+  thread_free(reader->inbound_read_thread);
+  free(reader);
+}
+
+void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
+  assert(reader != NULL);
+  assert(reactor != NULL);
+  assert(read_cb != NULL);
+
+  // Make sure the reader isn't currently registered.
+  eager_reader_unregister(reader);
+
+  reader->outbound_read_ready = read_cb;
+  reader->outbound_context = context;
+  reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
+}
+
+void eager_reader_unregister(eager_reader_t *reader) {
+  assert(reader != NULL);
+
+  if (reader->outbound_registration) {
+    reactor_unregister(reader->outbound_registration);
+    reader->outbound_registration = NULL;
+  }
+}
+
+// SEE HEADER FOR THREAD SAFETY NOTE
+uint8_t eager_reader_read_byte(eager_reader_t *reader) {
+  assert(reader != NULL);
+
+  eventfd_t value;
+  if (eventfd_read(reader->bytes_available_fd, &value) == -1)
+    ALOGE("%s unable to read semaphore for output data.", __func__);
+
+  if (!reader->current_buffer)
+    reader->current_buffer = fixed_queue_dequeue(reader->buffers);
+
+  uint8_t byte = reader->current_buffer->data[reader->current_buffer->offset];
+  reader->current_buffer->offset++;
+
+  // Prep for next byte request
+  if (reader->current_buffer->offset >= reader->current_buffer->length) {
+    reader->allocator->free(reader->current_buffer);
+    reader->current_buffer = NULL;
+  }
+
+  return byte;
+}
+
+bool eager_reader_has_byte(eager_reader_t *reader) {
+  assert(reader != NULL);
+
+  fd_set read_fds;
+  FD_ZERO(&read_fds);
+  FD_SET(reader->bytes_available_fd, &read_fds);
+
+  // Immediate timeout
+  struct timeval timeout;
+  timeout.tv_sec = 0;
+  timeout.tv_usec = 0;
+
+  select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
+  return FD_ISSET(reader->bytes_available_fd, &read_fds);
+}
+
+static void inbound_data_waiting(void *context) {
+  eager_reader_t *reader = (eager_reader_t *)context;
+
+  data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
+  if (!buffer) {
+    ALOGE("%s couldn't aquire memory for inbound data buffer.", __func__);
+    return;
+  }
+
+  buffer->length = 0;
+  buffer->offset = 0;
+
+  int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
+  if (bytes_read > 0) {
+    // Save the data for later
+    buffer->length = bytes_read;
+    fixed_queue_enqueue(reader->buffers, buffer);
+
+    // Tell consumers data is available by incrementing
+    // the semaphore by the number of bytes we just read
+    eventfd_write(reader->bytes_available_fd, bytes_read);
+  } else {
+    if (bytes_read == 0)
+      ALOGW("%s fd said bytes existed, but none were found.", __func__);
+    else
+      ALOGW("%s unable to read from file descriptor: %s", __func__, strerror(errno));
+
+    reader->allocator->free(buffer);
+  }
+}
+
+static void internal_outbound_read_ready(void *context) {
+  assert(context != NULL);
+
+  eager_reader_t *reader = (eager_reader_t *)context;
+  reader->outbound_read_ready(reader, reader->outbound_context);
+}
diff --git a/osi/test/eager_reader_test.cpp b/osi/test/eager_reader_test.cpp
new file mode 100644 (file)
index 0000000..6306115
--- /dev/null
@@ -0,0 +1,88 @@
+/******************************************************************************
+ *
+ *  Copyright (C) 2014 Google, Inc.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at:
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ ******************************************************************************/
+
+#include <gtest/gtest.h>
+
+extern "C" {
+#include <stdint.h>
+#include <unistd.h>
+#include <utils/Log.h>
+
+#include "allocator.h"
+#include "eager_reader.h"
+#include "osi.h"
+#include "semaphore.h"
+#include "thread.h"
+}
+
+#define BUFFER_SIZE 32
+
+static char *small_data = (char *)"white chocolate lindor truffles";
+
+static semaphore_t *done;
+
+class EagerReaderTest : public ::testing::Test {
+  protected:
+    virtual void SetUp() {
+      pipe(pipefd);
+      done = semaphore_new(0);
+    }
+
+    virtual void TearDown() {
+      semaphore_free(done);
+    }
+
+    int pipefd[2];
+};
+
+static void expect_data(eager_reader_t *reader, void *context) {
+  EXPECT_TRUE(eager_reader_has_byte(reader)) << "if we got a callback we expect there to be data";
+
+  char *data = (char *)context;
+  int length = strlen(data);
+  int i;
+
+  for (i = 0; i < length; i++) {
+    EXPECT_EQ(data[i], eager_reader_read_byte(reader));
+  }
+
+  semaphore_post(done);
+}
+
+TEST_F(EagerReaderTest, test_new_simple) {
+  eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+  ASSERT_TRUE(reader != NULL);
+}
+
+TEST_F(EagerReaderTest, test_free_simple) {
+  eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+  eager_reader_free(reader);
+}
+
+TEST_F(EagerReaderTest, test_small_data) {
+  eager_reader_t *reader = eager_reader_new(pipefd[0], &allocator_malloc, BUFFER_SIZE, SIZE_MAX, (char *)"test_thread");
+
+  thread_t *read_thread = thread_new("read_thread");
+  eager_reader_register(reader, thread_get_reactor(read_thread), expect_data, small_data);
+
+  write(pipefd[1], small_data, strlen(small_data));
+
+  semaphore_wait(done);
+  eager_reader_free(reader);
+  thread_free(read_thread);
+}