From 19084c6242d8ddb366a52eba5084c974280cce0f Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Mon, 23 Jun 2014 16:30:46 -0700 Subject: [PATCH] Implement the reactor pattern in C. This code will form the basis of most select-based event loops in bluedroid. It provides a thread-safe abort routine and a separation between the dispatcher and event handler code. Change-Id: I6f1c033d18f045ba273187dab607c209dfe32d30 --- osi/Android.mk | 4 +- osi/include/osi.h | 2 + osi/include/reactor.h | 89 ++++++++++++++++++++++++ osi/src/reactor.c | 174 ++++++++++++++++++++++++++++++++++++++++++++++ osi/test/reactor_test.cpp | 101 +++++++++++++++++++++++++++ 5 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 osi/include/reactor.h create mode 100644 osi/src/reactor.c create mode 100644 osi/test/reactor_test.cpp diff --git a/osi/Android.mk b/osi/Android.mk index 23c43f79b..646209c6c 100644 --- a/osi/Android.mk +++ b/osi/Android.mk @@ -9,6 +9,7 @@ LOCAL_SRC_FILES := \ ./src/config.c \ ./src/fixed_queue.c \ ./src/list.c \ + ./src/reactor.c \ ./src/semaphore.c LOCAL_CFLAGS := -std=c99 -Wall -Werror @@ -28,7 +29,8 @@ LOCAL_C_INCLUDES := \ LOCAL_SRC_FILES := \ ./test/config_test.cpp \ - ./test/list_test.cpp + ./test/list_test.cpp \ + ./test/reactor_test.cpp LOCAL_CFLAGS := -Wall -Werror LOCAL_MODULE := ositests diff --git a/osi/include/osi.h b/osi/include/osi.h index 2b79527ea..6b38e9b75 100644 --- a/osi/include/osi.h +++ b/osi/include/osi.h @@ -5,3 +5,5 @@ #define UNUSED_ATTR __attribute__((unused)) #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) + +typedef uint32_t timeout_t; diff --git a/osi/include/reactor.h b/osi/include/reactor.h new file mode 100644 index 000000000..eeb538dc6 --- /dev/null +++ b/osi/include/reactor.h @@ -0,0 +1,89 @@ +/****************************************************************************** + * + * Copyright (C) 2014 Google, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +#pragma once + +#include +#include + +#include "osi.h" + +// This module implements the Reactor pattern. +// See http://en.wikipedia.org/wiki/Reactor_pattern for details. + +struct reactor_t; +typedef struct reactor_t reactor_t; + +struct reactor_object_t; +typedef struct reactor_object_t reactor_object_t; + +// Enumerates the types of events a reactor object is interested +// in responding to. +typedef enum { + REACTOR_INTEREST_READ = 1, + REACTOR_INTEREST_WRITE = 2, + REACTOR_INTEREST_READ_WRITE = 3, +} reactor_interest_t; + +// Enumerates the reasons a reactor has stopped. +typedef enum { + REACTOR_STATUS_STOP, // |reactor_stop| was called. + REACTOR_STATUS_TIMEOUT, // a timeout was specified and the reactor timed out. + REACTOR_STATUS_ERROR, // there was an error during the operation. + REACTOR_STATUS_DONE, // the reactor completed its work (for the _run_once* variants). +} reactor_status_t; + +struct reactor_object_t { + void *context; // a context that's passed back to the *_ready functions. + int fd; // the file descriptor to monitor for events. + reactor_interest_t interest; // the event types to monitor the file descriptor for. + + void (*read_ready)(void *context); // function to call when the file descriptor becomes readable. + void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable. +}; + +// Creates a new reactor object. Returns NULL on failure. The returned object +// must be freed by calling |reactor_free|. +reactor_t *reactor_new(void); + +// Frees a reactor object created with |reactor_new|. |reactor| may be NULL. +void reactor_free(reactor_t *reactor); + +// Starts the reactor. This function blocks the caller until |reactor_stop| is called +// from another thread or in a callback. |reactor| may not be NULL. +reactor_status_t reactor_start(reactor_t *reactor); + +// Runs one iteration of the reactor. This function blocks until at least one registered object +// becomes ready. |reactor| may not be NULL. +reactor_status_t reactor_run_once(reactor_t *reactor); + +// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready. +reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms); + +// Immediately unblocks the reactor. This function is safe to call from any thread. +// |reactor| may not be NULL. +void reactor_stop(reactor_t *reactor); + +// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred +// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither +// |reactor| nor |obj| may be NULL. +void reactor_register(reactor_t *reactor, reactor_object_t *obj); + +// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj| +// may be NULL. +void reactor_unregister(reactor_t *reactor, reactor_object_t *obj); diff --git a/osi/src/reactor.c b/osi/src/reactor.c new file mode 100644 index 000000000..d3d7767e4 --- /dev/null +++ b/osi/src/reactor.c @@ -0,0 +1,174 @@ +/****************************************************************************** + * + * Copyright (C) 2014 Google, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +#define LOG_TAG "bt_osi_reactor" + +#include +#include +#include +#include +#include +#include + +#include "list.h" +#include "reactor.h" + +#if !defined(EFD_SEMAPHORE) +# define EFD_SEMAPHORE (1 << 0) +#endif + +struct reactor_t { + int event_fd; + list_t *objects; +}; + +static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv); + +reactor_t *reactor_new(void) { + reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t)); + if (!ret) + return NULL; + + ret->event_fd = eventfd(0, EFD_SEMAPHORE); + if (ret->event_fd == -1) { + ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno)); + goto error; + } + + ret->objects = list_new(NULL); + if (!ret->objects) + goto error; + + return ret; + +error:; + list_free(ret->objects); + close(ret->event_fd); + free(ret); + return NULL; +} + +void reactor_free(reactor_t *reactor) { + if (!reactor) + return; + + list_free(reactor->objects); + close(reactor->event_fd); + free(reactor); +} + +reactor_status_t reactor_start(reactor_t *reactor) { + assert(reactor != NULL); + return run_reactor(reactor, 0, NULL); +} + +reactor_status_t reactor_run_once(reactor_t *reactor) { + assert(reactor != NULL); + return run_reactor(reactor, 1, NULL); +} + +reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) { + assert(reactor != NULL); + + struct timeval tv; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + return run_reactor(reactor, 1, &tv); +} + +void reactor_stop(reactor_t *reactor) { + assert(reactor != NULL); + + eventfd_write(reactor->event_fd, 1); +} + +void reactor_register(reactor_t *reactor, reactor_object_t *obj) { + assert(reactor != NULL); + assert(obj != NULL); + + list_append(reactor->objects, obj); +} + +void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) { + assert(reactor != NULL); + assert(obj != NULL); + + list_remove(reactor->objects, obj); +} + +// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|. +// 0 |iterations| means loop forever. +// NULL |tv| means no timeout (block until an event occurs). +// |reactor| may not be NULL. +static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) { + assert(reactor != NULL); + + for (int i = 0; iterations == 0 || i < iterations; ++i) { + fd_set read_set; + fd_set write_set; + FD_ZERO(&read_set); + FD_ZERO(&write_set); + FD_SET(reactor->event_fd, &read_set); + + int max_fd = reactor->event_fd; + for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) { + reactor_object_t *object = (reactor_object_t *)list_node(iter); + int fd = object->fd; + reactor_interest_t interest = object->interest; + if (interest & REACTOR_INTEREST_READ) + FD_SET(fd, &read_set); + if (interest & REACTOR_INTEREST_WRITE) + FD_SET(fd, &write_set); + if (fd > max_fd) + max_fd = fd; + } + + int ret; + do { + ret = select(max_fd + 1, &read_set, &write_set, NULL, tv); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + ALOGE("%s error in select: %s", __func__, strerror(errno)); + return REACTOR_STATUS_ERROR; + } + + if (ret == 0) + return REACTOR_STATUS_TIMEOUT; + + if (FD_ISSET(reactor->event_fd, &read_set)) { + eventfd_t value; + eventfd_read(reactor->event_fd, &value); + return REACTOR_STATUS_STOP; + } + + for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) { + reactor_object_t *object = (reactor_object_t *)list_node(iter); + int fd = object->fd; + if (FD_ISSET(fd, &read_set)) { + object->read_ready(object->context); + --ret; + } + if (FD_ISSET(fd, &write_set)) { + object->write_ready(object->context); + --ret; + } + } + } + return REACTOR_STATUS_DONE; +} diff --git a/osi/test/reactor_test.cpp b/osi/test/reactor_test.cpp new file mode 100644 index 000000000..359ed2894 --- /dev/null +++ b/osi/test/reactor_test.cpp @@ -0,0 +1,101 @@ +#include +#include +#include +#include + +extern "C" { +#include "reactor.h" +} + +static pthread_t thread; +static volatile bool thread_running; + +static void *reactor_thread(void *ptr) { + reactor_t *reactor = (reactor_t *)ptr; + + thread_running = true; + reactor_start(reactor); + thread_running = false; + + return NULL; +} + +static void spawn_reactor_thread(reactor_t *reactor) { + int ret = pthread_create(&thread, NULL, reactor_thread, reactor); + EXPECT_EQ(ret, 0); +} + +static void join_reactor_thread() { + pthread_join(thread, NULL); +} + +static uint64_t get_timestamp(void) { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * 1000 + tv.tv_usec / 1000; +} + +TEST(ReactorTest, reactor_new) { + reactor_t *reactor = reactor_new(); + EXPECT_TRUE(reactor != NULL); + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_free_null) { + reactor_free(NULL); +} + +TEST(ReactorTest, reactor_stop_start) { + reactor_t *reactor = reactor_new(); + reactor_stop(reactor); + reactor_start(reactor); + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_repeated_stop_start) { + reactor_t *reactor = reactor_new(); + for (int i = 0; i < 10; ++i) { + reactor_stop(reactor); + reactor_start(reactor); + } + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_multi_stop_start) { + reactor_t *reactor = reactor_new(); + + reactor_stop(reactor); + reactor_stop(reactor); + reactor_stop(reactor); + + reactor_start(reactor); + reactor_start(reactor); + reactor_start(reactor); + + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_start_wait_stop) { + reactor_t *reactor = reactor_new(); + + spawn_reactor_thread(reactor); + usleep(50 * 1000); + EXPECT_TRUE(thread_running); + + reactor_stop(reactor); + join_reactor_thread(); + EXPECT_FALSE(thread_running); + + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_run_once_timeout) { + reactor_t *reactor = reactor_new(); + + uint64_t start = get_timestamp(); + reactor_status_t status = reactor_run_once_timeout(reactor, 50); + EXPECT_GE(get_timestamp() - start, 50); + EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT); + + reactor_free(reactor); +} -- 2.11.0