./src/config.c \
./src/fixed_queue.c \
./src/list.c \
+ ./src/reactor.c \
./src/semaphore.c
LOCAL_CFLAGS := -std=c99 -Wall -Werror
LOCAL_SRC_FILES := \
./test/config_test.cpp \
- ./test/list_test.cpp
+ ./test/list_test.cpp \
+ ./test/reactor_test.cpp
LOCAL_CFLAGS := -Wall -Werror
LOCAL_MODULE := ositests
#define UNUSED_ATTR __attribute__((unused))
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
+
+typedef uint32_t timeout_t;
--- /dev/null
+/******************************************************************************
+ *
+ * Copyright (C) 2014 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+#pragma once
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "osi.h"
+
+// This module implements the Reactor pattern.
+// See http://en.wikipedia.org/wiki/Reactor_pattern for details.
+
+struct reactor_t;
+typedef struct reactor_t reactor_t;
+
+struct reactor_object_t;
+typedef struct reactor_object_t reactor_object_t;
+
+// Enumerates the types of events a reactor object is interested
+// in responding to.
+typedef enum {
+ REACTOR_INTEREST_READ = 1,
+ REACTOR_INTEREST_WRITE = 2,
+ REACTOR_INTEREST_READ_WRITE = 3,
+} reactor_interest_t;
+
+// Enumerates the reasons a reactor has stopped.
+typedef enum {
+ REACTOR_STATUS_STOP, // |reactor_stop| was called.
+ REACTOR_STATUS_TIMEOUT, // a timeout was specified and the reactor timed out.
+ REACTOR_STATUS_ERROR, // there was an error during the operation.
+ REACTOR_STATUS_DONE, // the reactor completed its work (for the _run_once* variants).
+} reactor_status_t;
+
+struct reactor_object_t {
+ void *context; // a context that's passed back to the *_ready functions.
+ int fd; // the file descriptor to monitor for events.
+ reactor_interest_t interest; // the event types to monitor the file descriptor for.
+
+ void (*read_ready)(void *context); // function to call when the file descriptor becomes readable.
+ void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable.
+};
+
+// Creates a new reactor object. Returns NULL on failure. The returned object
+// must be freed by calling |reactor_free|.
+reactor_t *reactor_new(void);
+
+// Frees a reactor object created with |reactor_new|. |reactor| may be NULL.
+void reactor_free(reactor_t *reactor);
+
+// Starts the reactor. This function blocks the caller until |reactor_stop| is called
+// from another thread or in a callback. |reactor| may not be NULL.
+reactor_status_t reactor_start(reactor_t *reactor);
+
+// Runs one iteration of the reactor. This function blocks until at least one registered object
+// becomes ready. |reactor| may not be NULL.
+reactor_status_t reactor_run_once(reactor_t *reactor);
+
+// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready.
+reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms);
+
+// Immediately unblocks the reactor. This function is safe to call from any thread.
+// |reactor| may not be NULL.
+void reactor_stop(reactor_t *reactor);
+
+// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred
+// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither
+// |reactor| nor |obj| may be NULL.
+void reactor_register(reactor_t *reactor, reactor_object_t *obj);
+
+// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj|
+// may be NULL.
+void reactor_unregister(reactor_t *reactor, reactor_object_t *obj);
--- /dev/null
+/******************************************************************************
+ *
+ * Copyright (C) 2014 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+#define LOG_TAG "bt_osi_reactor"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/eventfd.h>
+#include <sys/select.h>
+#include <utils/Log.h>
+
+#include "list.h"
+#include "reactor.h"
+
+#if !defined(EFD_SEMAPHORE)
+# define EFD_SEMAPHORE (1 << 0)
+#endif
+
+struct reactor_t {
+ int event_fd;
+ list_t *objects;
+};
+
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);
+
+reactor_t *reactor_new(void) {
+ reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
+ if (!ret)
+ return NULL;
+
+ ret->event_fd = eventfd(0, EFD_SEMAPHORE);
+ if (ret->event_fd == -1) {
+ ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
+ goto error;
+ }
+
+ ret->objects = list_new(NULL);
+ if (!ret->objects)
+ goto error;
+
+ return ret;
+
+error:;
+ list_free(ret->objects);
+ close(ret->event_fd);
+ free(ret);
+ return NULL;
+}
+
+void reactor_free(reactor_t *reactor) {
+ if (!reactor)
+ return;
+
+ list_free(reactor->objects);
+ close(reactor->event_fd);
+ free(reactor);
+}
+
+reactor_status_t reactor_start(reactor_t *reactor) {
+ assert(reactor != NULL);
+ return run_reactor(reactor, 0, NULL);
+}
+
+reactor_status_t reactor_run_once(reactor_t *reactor) {
+ assert(reactor != NULL);
+ return run_reactor(reactor, 1, NULL);
+}
+
+reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
+ assert(reactor != NULL);
+
+ struct timeval tv;
+ tv.tv_sec = timeout_ms / 1000;
+ tv.tv_usec = (timeout_ms % 1000) * 1000;
+ return run_reactor(reactor, 1, &tv);
+}
+
+void reactor_stop(reactor_t *reactor) {
+ assert(reactor != NULL);
+
+ eventfd_write(reactor->event_fd, 1);
+}
+
+void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
+ assert(reactor != NULL);
+ assert(obj != NULL);
+
+ list_append(reactor->objects, obj);
+}
+
+void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
+ assert(reactor != NULL);
+ assert(obj != NULL);
+
+ list_remove(reactor->objects, obj);
+}
+
+// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
+// 0 |iterations| means loop forever.
+// NULL |tv| means no timeout (block until an event occurs).
+// |reactor| may not be NULL.
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
+ assert(reactor != NULL);
+
+ for (int i = 0; iterations == 0 || i < iterations; ++i) {
+ fd_set read_set;
+ fd_set write_set;
+ FD_ZERO(&read_set);
+ FD_ZERO(&write_set);
+ FD_SET(reactor->event_fd, &read_set);
+
+ int max_fd = reactor->event_fd;
+ for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
+ reactor_object_t *object = (reactor_object_t *)list_node(iter);
+ int fd = object->fd;
+ reactor_interest_t interest = object->interest;
+ if (interest & REACTOR_INTEREST_READ)
+ FD_SET(fd, &read_set);
+ if (interest & REACTOR_INTEREST_WRITE)
+ FD_SET(fd, &write_set);
+ if (fd > max_fd)
+ max_fd = fd;
+ }
+
+ int ret;
+ do {
+ ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ ALOGE("%s error in select: %s", __func__, strerror(errno));
+ return REACTOR_STATUS_ERROR;
+ }
+
+ if (ret == 0)
+ return REACTOR_STATUS_TIMEOUT;
+
+ if (FD_ISSET(reactor->event_fd, &read_set)) {
+ eventfd_t value;
+ eventfd_read(reactor->event_fd, &value);
+ return REACTOR_STATUS_STOP;
+ }
+
+ for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
+ reactor_object_t *object = (reactor_object_t *)list_node(iter);
+ int fd = object->fd;
+ if (FD_ISSET(fd, &read_set)) {
+ object->read_ready(object->context);
+ --ret;
+ }
+ if (FD_ISSET(fd, &write_set)) {
+ object->write_ready(object->context);
+ --ret;
+ }
+ }
+ }
+ return REACTOR_STATUS_DONE;
+}
--- /dev/null
+#include <gtest/gtest.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+extern "C" {
+#include "reactor.h"
+}
+
+static pthread_t thread;
+static volatile bool thread_running;
+
+static void *reactor_thread(void *ptr) {
+ reactor_t *reactor = (reactor_t *)ptr;
+
+ thread_running = true;
+ reactor_start(reactor);
+ thread_running = false;
+
+ return NULL;
+}
+
+static void spawn_reactor_thread(reactor_t *reactor) {
+ int ret = pthread_create(&thread, NULL, reactor_thread, reactor);
+ EXPECT_EQ(ret, 0);
+}
+
+static void join_reactor_thread() {
+ pthread_join(thread, NULL);
+}
+
+static uint64_t get_timestamp(void) {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return tv.tv_sec * 1000 + tv.tv_usec / 1000;
+}
+
+TEST(ReactorTest, reactor_new) {
+ reactor_t *reactor = reactor_new();
+ EXPECT_TRUE(reactor != NULL);
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_free_null) {
+ reactor_free(NULL);
+}
+
+TEST(ReactorTest, reactor_stop_start) {
+ reactor_t *reactor = reactor_new();
+ reactor_stop(reactor);
+ reactor_start(reactor);
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_repeated_stop_start) {
+ reactor_t *reactor = reactor_new();
+ for (int i = 0; i < 10; ++i) {
+ reactor_stop(reactor);
+ reactor_start(reactor);
+ }
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_multi_stop_start) {
+ reactor_t *reactor = reactor_new();
+
+ reactor_stop(reactor);
+ reactor_stop(reactor);
+ reactor_stop(reactor);
+
+ reactor_start(reactor);
+ reactor_start(reactor);
+ reactor_start(reactor);
+
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_start_wait_stop) {
+ reactor_t *reactor = reactor_new();
+
+ spawn_reactor_thread(reactor);
+ usleep(50 * 1000);
+ EXPECT_TRUE(thread_running);
+
+ reactor_stop(reactor);
+ join_reactor_thread();
+ EXPECT_FALSE(thread_running);
+
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_run_once_timeout) {
+ reactor_t *reactor = reactor_new();
+
+ uint64_t start = get_timestamp();
+ reactor_status_t status = reactor_run_once_timeout(reactor, 50);
+ EXPECT_GE(get_timestamp() - start, 50);
+ EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT);
+
+ reactor_free(reactor);
+}