OSDN Git Service

Extend the threading library to back thread with an event queue.
authorSharvil Nanavati <sharvil.nanavati@gmail.com>
Sat, 5 Jul 2014 00:51:29 +0000 (17:51 -0700)
committerSharvil Nanavati <sharvil@google.com>
Wed, 16 Jul 2014 20:25:42 +0000 (13:25 -0700)
This change allows arbitrary functions to be called on a given
thread which will clean up much of the dispatch code in bluedroid.
Looking forward, this code will be extended to allow additional objects
and queues to be attached to a thread for more customizable dispatch.

Change-Id: Id3a16256c264e3d35e6db5a562cb0e7762676457

osi/include/thread.h
osi/src/thread.c
osi/test/thread_test.cpp

index 8558e92..c78c5e3 100644 (file)
 
 #define THREAD_NAME_MAX 16
 
-struct thread_t;
 typedef struct thread_t thread_t;
+typedef void (*thread_fn)(void *context);
 
-typedef void *(*thread_start_cb) (void *);
+// Creates and starts a new thread with the given name. Only THREAD_NAME_MAX
+// bytes from |name| will be assigned to the newly-created thread. Returns a
+// thread object if the thread was successfully started, NULL otherwise. The
+// returned thread object must be freed with |thread_free|. |name| may not
+// be NULL.
+thread_t *thread_new(const char *name);
 
-// Lifecycle
-thread_t *thread_create(const char *name,
-                         thread_start_cb start_routine, void *arg);
-int thread_join(thread_t *thread, void **retval);
+// Frees the given |thread|. If the thread is still running, it is stopped
+// and the calling thread will block until |thread| terminates. |thread|
+// may be NULL.
+void thread_free(thread_t *thread);
 
-// Query
-pid_t thread_id(const thread_t *thread);
+// Call |func| with the argument |context| on |thread|. This function typically
+// does not block unless there are an excessive number of functions posted to
+// |thread| that have not been dispatched yet. Neither |thread| nor |func| may
+// be NULL. |context| may be NULL.
+bool thread_post(thread_t *thread, thread_fn func, void *context);
+
+// Requests |thread| to stop. Only |thread_free| and |thread_name| may be called
+// after calling |thread_stop|. This function is guaranteed to not block.
+// |thread| may not be NULL.
+void thread_stop(thread_t *thread);
+
+// Returns the name of the given |thread|. |thread| may not be NULL.
 const char *thread_name(const thread_t *thread);
index b81e901..cb644d3 100644 (file)
 #define LOG_TAG "osi_thread"
 
 #include <assert.h>
+#include <errno.h>
 #include <pthread.h>
 #include <string.h>
 #include <sys/prctl.h>
 #include <sys/types.h>
 #include <utils/Log.h>
 
+#include "fixed_queue.h"
+#include "reactor.h"
 #include "semaphore.h"
 #include "thread.h"
 
-typedef struct thread_t {
+struct thread_t {
   pthread_t pthread;
   pid_t tid;
-  char name[THREAD_NAME_MAX+1];
-} thread_t;
-
-pid_t thread_id(const thread_t *thread) {
-  assert(thread != NULL);
-  return thread->tid;
-}
-
-const char *thread_name(const thread_t *thread) {
-  assert(thread != NULL);
-  return thread->name;
-}
+  char name[THREAD_NAME_MAX + 1];
+  reactor_t *reactor;
+  fixed_queue_t *work_queue;
+};
 
 struct start_arg {
   thread_t *thread;
   semaphore_t *start_sem;
   int error;
-  thread_start_cb start_routine;
-  void *arg;
 };
 
-static void *run_thread(void *start_arg) {
-  assert(start_arg != NULL);
+typedef struct {
+  thread_fn func;
+  void *context;
+} work_item_t;
 
-  struct start_arg *start = start_arg;
-  thread_t *thread = start->thread;
+static void *run_thread(void *start_arg);
+static void work_queue_read_cb(void *context);
 
-  assert(thread != NULL);
+static const size_t WORK_QUEUE_CAPACITY = 128;
 
-  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
-    ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
-    start->error = errno;
-    semaphore_post(start->start_sem);
-    return NULL;
-  }
-  thread->tid = gettid();
-
-  // Cache local values because we are about to let thread_create
-  // continue
-  thread_start_cb start_routine = start->start_routine;
-  void *arg = start->arg;
-
-  semaphore_post(start->start_sem);
-  return start_routine(arg);
-}
-
-thread_t *thread_create(const char *name,
-                        thread_start_cb start_routine, void *arg) {
+thread_t *thread_new(const char *name) {
   assert(name != NULL);
-  assert(start_routine != NULL);
 
   // Start is on the stack, but we use a semaphore, so it's safe
-  struct start_arg start;
-  thread_t *ret;
-  ret = calloc(1, sizeof(thread_t));
+  thread_t *ret = calloc(1, sizeof(thread_t));
   if (!ret)
     goto error;
+
+  ret->reactor = reactor_new();
+  if (!ret->reactor)
+    goto error;
+
+  ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY);
+  if (!ret->work_queue)
+    goto error;
+
+  struct start_arg start;
   start.start_sem = semaphore_new(0);
   if (!start.start_sem)
     goto error;
@@ -95,23 +79,114 @@ thread_t *thread_create(const char *name,
   strncpy(ret->name, name, THREAD_NAME_MAX);
   start.thread = ret;
   start.error = 0;
-  start.start_routine = start_routine;
-  start.arg = arg;
   pthread_create(&ret->pthread, NULL, run_thread, &start);
   semaphore_wait(start.start_sem);
+  semaphore_free(start.start_sem);
   if (start.error)
     goto error;
   return ret;
 
 error:;
-  semaphore_free(start.start_sem);
+  if (ret) {
+    fixed_queue_free(ret->work_queue, free);
+    reactor_free(ret->reactor);
+  }
   free(ret);
   return NULL;
 }
 
-int thread_join(thread_t *thread, void **retval) {
-  int ret = pthread_join(thread->pthread, retval);
-  if (!ret)
-    free(thread);
-  return ret;
+void thread_free(thread_t *thread) {
+  if (!thread)
+    return;
+
+  thread_stop(thread);
+  pthread_join(thread->pthread, NULL);
+  fixed_queue_free(thread->work_queue, free);
+  reactor_free(thread->reactor);
+  free(thread);
+}
+
+bool thread_post(thread_t *thread, thread_fn func, void *context) {
+  assert(thread != NULL);
+  assert(func != NULL);
+
+  // TODO(sharvil): if the current thread == |thread| and we've run out
+  // of queue space, we should abort this operation, otherwise we'll
+  // deadlock.
+
+  // Queue item is freed either when the queue itself is destroyed
+  // or when the item is removed from the queue for dispatch.
+  work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t));
+  if (!item) {
+    ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno));
+    return false;
+  }
+  item->func = func;
+  item->context = context;
+  fixed_queue_enqueue(thread->work_queue, item);
+  return true;
+}
+
+void thread_stop(thread_t *thread) {
+  assert(thread != NULL);
+  reactor_stop(thread->reactor);
+}
+
+const char *thread_name(const thread_t *thread) {
+  assert(thread != NULL);
+  return thread->name;
+}
+
+static void *run_thread(void *start_arg) {
+  assert(start_arg != NULL);
+
+  struct start_arg *start = start_arg;
+  thread_t *thread = start->thread;
+
+  assert(thread != NULL);
+
+  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
+    ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
+    start->error = errno;
+    semaphore_post(start->start_sem);
+    return NULL;
+  }
+  thread->tid = gettid();
+
+  semaphore_post(start->start_sem);
+
+  reactor_object_t work_queue_object;
+  work_queue_object.context = thread->work_queue;
+  work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
+  work_queue_object.interest = REACTOR_INTEREST_READ;
+  work_queue_object.read_ready = work_queue_read_cb;
+
+  reactor_register(thread->reactor, &work_queue_object);
+  reactor_start(thread->reactor);
+
+  // Make sure we dispatch all queued work items before exiting the thread.
+  // This allows a caller to safely tear down by enqueuing a teardown
+  // work item and then joining the thread.
+  size_t count = 0;
+  work_item_t *item = fixed_queue_try_dequeue(thread->work_queue);
+  while (item && count <= WORK_QUEUE_CAPACITY) {
+    item->func(item->context);
+    free(item);
+    item = fixed_queue_try_dequeue(thread->work_queue);
+    ++count;
+  }
+
+  if (count > WORK_QUEUE_CAPACITY)
+    ALOGD("%s growing event queue on shutdown.", __func__);
+
+  return NULL;
+}
+
+static void work_queue_read_cb(void *context) {
+  assert(context != NULL);
+
+  fixed_queue_t *queue = (fixed_queue_t *)context;
+  work_item_t *item = fixed_queue_dequeue(queue);
+  item->func(item->context);
+  free(item);
 }
index bf4c03c..edbc005 100644 (file)
@@ -5,44 +5,31 @@ extern "C" {
 #include "osi.h"
 }
 
-void *start_routine(void *arg)
-{
-  return arg;
-}
-
 TEST(ThreadTest, test_new_simple) {
-  thread_t *thread = thread_create("test_thread", &start_routine, NULL);
+  thread_t *thread = thread_new("test_thread");
   ASSERT_TRUE(thread != NULL);
-  thread_join(thread, NULL);
+  thread_free(thread);
 }
 
-TEST(ThreadTest, test_join_simple) {
-  thread_t *thread = thread_create("test_thread", &start_routine, NULL);
-  thread_join(thread, NULL);
+TEST(ThreadTest, test_free_simple) {
+  thread_t *thread = thread_new("test_thread");
+  thread_free(thread);
 }
 
 TEST(ThreadTest, test_name) {
-  thread_t *thread = thread_create("test_name", &start_routine, NULL);
+  thread_t *thread = thread_new("test_name");
   ASSERT_STREQ(thread_name(thread), "test_name");
-  thread_join(thread, NULL);
+  thread_free(thread);
 }
 
 TEST(ThreadTest, test_long_name) {
-  thread_t *thread = thread_create("0123456789abcdef", &start_routine, NULL);
+  thread_t *thread = thread_new("0123456789abcdef");
   ASSERT_STREQ("0123456789abcdef", thread_name(thread));
-  thread_join(thread, NULL);
+  thread_free(thread);
 }
 
 TEST(ThreadTest, test_very_long_name) {
-  thread_t *thread = thread_create("0123456789abcdefg", &start_routine, NULL);
+  thread_t *thread = thread_new("0123456789abcdefg");
   ASSERT_STREQ("0123456789abcdef", thread_name(thread));
-  thread_join(thread, NULL);
-}
-
-TEST(ThreadTest, test_return) {
-  int arg = 10;
-  void *ret;
-  thread_t *thread = thread_create("test", &start_routine, &arg);
-  thread_join(thread, &ret);
-  ASSERT_EQ(ret, &arg);
+  thread_free(thread);
 }