OSDN Git Service

Implement the reactor pattern in C.
authorSharvil Nanavati <sharvil@google.com>
Mon, 23 Jun 2014 23:30:46 +0000 (16:30 -0700)
committerSharvil Nanavati <sharvil@google.com>
Thu, 3 Jul 2014 06:30:22 +0000 (06:30 +0000)
This code will form the basis of most select-based event loops in
bluedroid. It provides a thread-safe abort routine and a separation
between the dispatcher and event handler code.

Change-Id: I6f1c033d18f045ba273187dab607c209dfe32d30

osi/Android.mk
osi/include/osi.h
osi/include/reactor.h [new file with mode: 0644]
osi/src/reactor.c [new file with mode: 0644]
osi/test/reactor_test.cpp [new file with mode: 0644]

index 23c43f7..646209c 100644 (file)
@@ -9,6 +9,7 @@ LOCAL_SRC_FILES := \
     ./src/config.c \
     ./src/fixed_queue.c \
     ./src/list.c \
+    ./src/reactor.c \
     ./src/semaphore.c
 
 LOCAL_CFLAGS := -std=c99 -Wall -Werror
@@ -28,7 +29,8 @@ LOCAL_C_INCLUDES := \
 
 LOCAL_SRC_FILES := \
     ./test/config_test.cpp \
-    ./test/list_test.cpp
+    ./test/list_test.cpp \
+    ./test/reactor_test.cpp
 
 LOCAL_CFLAGS := -Wall -Werror
 LOCAL_MODULE := ositests
index 2b79527..6b38e9b 100644 (file)
@@ -5,3 +5,5 @@
 
 #define UNUSED_ATTR __attribute__((unused))
 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
+
+typedef uint32_t timeout_t;
diff --git a/osi/include/reactor.h b/osi/include/reactor.h
new file mode 100644 (file)
index 0000000..eeb538d
--- /dev/null
@@ -0,0 +1,89 @@
+/******************************************************************************
+ *
+ *  Copyright (C) 2014 Google, Inc.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at:
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ ******************************************************************************/
+
+#pragma once
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "osi.h"
+
+// This module implements the Reactor pattern.
+// See http://en.wikipedia.org/wiki/Reactor_pattern for details.
+
+struct reactor_t;
+typedef struct reactor_t reactor_t;
+
+struct reactor_object_t;
+typedef struct reactor_object_t reactor_object_t;
+
+// Enumerates the types of events a reactor object is interested
+// in responding to.
+typedef enum {
+  REACTOR_INTEREST_READ  = 1,
+  REACTOR_INTEREST_WRITE = 2,
+  REACTOR_INTEREST_READ_WRITE = 3,
+} reactor_interest_t;
+
+// Enumerates the reasons a reactor has stopped.
+typedef enum {
+  REACTOR_STATUS_STOP,     // |reactor_stop| was called.
+  REACTOR_STATUS_TIMEOUT,  // a timeout was specified and the reactor timed out.
+  REACTOR_STATUS_ERROR,    // there was an error during the operation.
+  REACTOR_STATUS_DONE,     // the reactor completed its work (for the _run_once* variants).
+} reactor_status_t;
+
+struct reactor_object_t {
+  void *context;                       // a context that's passed back to the *_ready functions.
+  int fd;                              // the file descriptor to monitor for events.
+  reactor_interest_t interest;         // the event types to monitor the file descriptor for.
+
+  void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
+  void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
+};
+
+// Creates a new reactor object. Returns NULL on failure. The returned object
+// must be freed by calling |reactor_free|.
+reactor_t *reactor_new(void);
+
+// Frees a reactor object created with |reactor_new|. |reactor| may be NULL.
+void reactor_free(reactor_t *reactor);
+
+// Starts the reactor. This function blocks the caller until |reactor_stop| is called
+// from another thread or in a callback. |reactor| may not be NULL.
+reactor_status_t reactor_start(reactor_t *reactor);
+
+// Runs one iteration of the reactor. This function blocks until at least one registered object
+// becomes ready. |reactor| may not be NULL.
+reactor_status_t reactor_run_once(reactor_t *reactor);
+
+// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready.
+reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms);
+
+// Immediately unblocks the reactor. This function is safe to call from any thread.
+// |reactor| may not be NULL.
+void reactor_stop(reactor_t *reactor);
+
+// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred
+// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither
+// |reactor| nor |obj| may be NULL.
+void reactor_register(reactor_t *reactor, reactor_object_t *obj);
+
+// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj|
+// may be NULL.
+void reactor_unregister(reactor_t *reactor, reactor_object_t *obj);
diff --git a/osi/src/reactor.c b/osi/src/reactor.c
new file mode 100644 (file)
index 0000000..d3d7767
--- /dev/null
@@ -0,0 +1,174 @@
+/******************************************************************************
+ *
+ *  Copyright (C) 2014 Google, Inc.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at:
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ ******************************************************************************/
+
+#define LOG_TAG "bt_osi_reactor"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/eventfd.h>
+#include <sys/select.h>
+#include <utils/Log.h>
+
+#include "list.h"
+#include "reactor.h"
+
+#if !defined(EFD_SEMAPHORE)
+#  define EFD_SEMAPHORE (1 << 0)
+#endif
+
+struct reactor_t {
+  int event_fd;
+  list_t *objects;
+};
+
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);
+
+reactor_t *reactor_new(void) {
+  reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
+  if (!ret)
+    return NULL;
+
+  ret->event_fd = eventfd(0, EFD_SEMAPHORE);
+  if (ret->event_fd == -1) {
+    ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
+    goto error;
+  }
+
+  ret->objects = list_new(NULL);
+  if (!ret->objects)
+    goto error;
+
+  return ret;
+
+error:;
+  list_free(ret->objects);
+  close(ret->event_fd);
+  free(ret);
+  return NULL;
+}
+
+void reactor_free(reactor_t *reactor) {
+  if (!reactor)
+    return;
+
+  list_free(reactor->objects);
+  close(reactor->event_fd);
+  free(reactor);
+}
+
+reactor_status_t reactor_start(reactor_t *reactor) {
+  assert(reactor != NULL);
+  return run_reactor(reactor, 0, NULL);
+}
+
+reactor_status_t reactor_run_once(reactor_t *reactor) {
+  assert(reactor != NULL);
+  return run_reactor(reactor, 1, NULL);
+}
+
+reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
+  assert(reactor != NULL);
+
+  struct timeval tv;
+  tv.tv_sec = timeout_ms / 1000;
+  tv.tv_usec = (timeout_ms % 1000) * 1000;
+  return run_reactor(reactor, 1, &tv);
+}
+
+void reactor_stop(reactor_t *reactor) {
+  assert(reactor != NULL);
+
+  eventfd_write(reactor->event_fd, 1);
+}
+
+void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
+  assert(reactor != NULL);
+  assert(obj != NULL);
+
+  list_append(reactor->objects, obj);
+}
+
+void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
+  assert(reactor != NULL);
+  assert(obj != NULL);
+
+  list_remove(reactor->objects, obj);
+}
+
+// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
+// 0 |iterations| means loop forever.
+// NULL |tv| means no timeout (block until an event occurs).
+// |reactor| may not be NULL.
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
+  assert(reactor != NULL);
+
+  for (int i = 0; iterations == 0 || i < iterations; ++i) {
+    fd_set read_set;
+    fd_set write_set;
+    FD_ZERO(&read_set);
+    FD_ZERO(&write_set);
+    FD_SET(reactor->event_fd, &read_set);
+
+    int max_fd = reactor->event_fd;
+    for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
+      reactor_object_t *object = (reactor_object_t *)list_node(iter);
+      int fd = object->fd;
+      reactor_interest_t interest = object->interest;
+      if (interest & REACTOR_INTEREST_READ)
+        FD_SET(fd, &read_set);
+      if (interest & REACTOR_INTEREST_WRITE)
+        FD_SET(fd, &write_set);
+      if (fd > max_fd)
+        max_fd = fd;
+    }
+
+    int ret;
+    do {
+      ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
+    } while (ret == -1 && errno == EINTR);
+
+    if (ret == -1) {
+      ALOGE("%s error in select: %s", __func__, strerror(errno));
+      return REACTOR_STATUS_ERROR;
+    }
+
+    if (ret == 0)
+      return REACTOR_STATUS_TIMEOUT;
+
+    if (FD_ISSET(reactor->event_fd, &read_set)) {
+      eventfd_t value;
+      eventfd_read(reactor->event_fd, &value);
+      return REACTOR_STATUS_STOP;
+    }
+
+    for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
+      reactor_object_t *object = (reactor_object_t *)list_node(iter);
+      int fd = object->fd;
+      if (FD_ISSET(fd, &read_set)) {
+        object->read_ready(object->context);
+        --ret;
+      }
+      if (FD_ISSET(fd, &write_set)) {
+        object->write_ready(object->context);
+        --ret;
+      }
+    }
+  }
+  return REACTOR_STATUS_DONE;
+}
diff --git a/osi/test/reactor_test.cpp b/osi/test/reactor_test.cpp
new file mode 100644 (file)
index 0000000..359ed28
--- /dev/null
@@ -0,0 +1,101 @@
+#include <gtest/gtest.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+extern "C" {
+#include "reactor.h"
+}
+
+static pthread_t thread;
+static volatile bool thread_running;
+
+static void *reactor_thread(void *ptr) {
+  reactor_t *reactor = (reactor_t *)ptr;
+
+  thread_running = true;
+  reactor_start(reactor);
+  thread_running = false;
+
+  return NULL;
+}
+
+static void spawn_reactor_thread(reactor_t *reactor) {
+  int ret = pthread_create(&thread, NULL, reactor_thread, reactor);
+  EXPECT_EQ(ret, 0);
+}
+
+static void join_reactor_thread() {
+  pthread_join(thread, NULL);
+}
+
+static uint64_t get_timestamp(void) {
+  struct timeval tv;
+  gettimeofday(&tv, NULL);
+  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
+}
+
+TEST(ReactorTest, reactor_new) {
+  reactor_t *reactor = reactor_new();
+  EXPECT_TRUE(reactor != NULL);
+  reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_free_null) {
+  reactor_free(NULL);
+}
+
+TEST(ReactorTest, reactor_stop_start) {
+  reactor_t *reactor = reactor_new();
+  reactor_stop(reactor);
+  reactor_start(reactor);
+  reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_repeated_stop_start) {
+  reactor_t *reactor = reactor_new();
+  for (int i = 0; i < 10; ++i) {
+    reactor_stop(reactor);
+    reactor_start(reactor);
+  }
+  reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_multi_stop_start) {
+  reactor_t *reactor = reactor_new();
+
+  reactor_stop(reactor);
+  reactor_stop(reactor);
+  reactor_stop(reactor);
+
+  reactor_start(reactor);
+  reactor_start(reactor);
+  reactor_start(reactor);
+
+  reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_start_wait_stop) {
+  reactor_t *reactor = reactor_new();
+
+  spawn_reactor_thread(reactor);
+  usleep(50 * 1000);
+  EXPECT_TRUE(thread_running);
+
+  reactor_stop(reactor);
+  join_reactor_thread();
+  EXPECT_FALSE(thread_running);
+
+  reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_run_once_timeout) {
+  reactor_t *reactor = reactor_new();
+
+  uint64_t start = get_timestamp();
+  reactor_status_t status = reactor_run_once_timeout(reactor, 50);
+  EXPECT_GE(get_timestamp() - start, 50);
+  EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT);
+
+  reactor_free(reactor);
+}