From 94347a59972868cf6d74f92cf793eb0b4b97c45a Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Fri, 4 Jul 2014 17:51:29 -0700 Subject: [PATCH] Extend the threading library to back thread with an event queue. This change allows arbitrary functions to be called on a given thread which will clean up much of the dispatch code in bluedroid. Looking forward, this code will be extended to allow additional objects and queues to be attached to a thread for more customizable dispatch. Change-Id: Id3a16256c264e3d35e6db5a562cb0e7762676457 --- osi/include/thread.h | 31 ++++++--- osi/src/thread.c | 177 +++++++++++++++++++++++++++++++++-------------- osi/test/thread_test.cpp | 35 +++------- 3 files changed, 160 insertions(+), 83 deletions(-) diff --git a/osi/include/thread.h b/osi/include/thread.h index 8558e92ec..c78c5e36a 100644 --- a/osi/include/thread.h +++ b/osi/include/thread.h @@ -20,16 +20,31 @@ #define THREAD_NAME_MAX 16 -struct thread_t; typedef struct thread_t thread_t; +typedef void (*thread_fn)(void *context); -typedef void *(*thread_start_cb) (void *); +// Creates and starts a new thread with the given name. Only THREAD_NAME_MAX +// bytes from |name| will be assigned to the newly-created thread. Returns a +// thread object if the thread was successfully started, NULL otherwise. The +// returned thread object must be freed with |thread_free|. |name| may not +// be NULL. +thread_t *thread_new(const char *name); -// Lifecycle -thread_t *thread_create(const char *name, - thread_start_cb start_routine, void *arg); -int thread_join(thread_t *thread, void **retval); +// Frees the given |thread|. If the thread is still running, it is stopped +// and the calling thread will block until |thread| terminates. |thread| +// may be NULL. +void thread_free(thread_t *thread); -// Query -pid_t thread_id(const thread_t *thread); +// Call |func| with the argument |context| on |thread|. This function typically +// does not block unless there are an excessive number of functions posted to +// |thread| that have not been dispatched yet. Neither |thread| nor |func| may +// be NULL. |context| may be NULL. +bool thread_post(thread_t *thread, thread_fn func, void *context); + +// Requests |thread| to stop. Only |thread_free| and |thread_name| may be called +// after calling |thread_stop|. This function is guaranteed to not block. +// |thread| may not be NULL. +void thread_stop(thread_t *thread); + +// Returns the name of the given |thread|. |thread| may not be NULL. const char *thread_name(const thread_t *thread); diff --git a/osi/src/thread.c b/osi/src/thread.c index b81e901ca..cb644d35b 100644 --- a/osi/src/thread.c +++ b/osi/src/thread.c @@ -19,75 +19,59 @@ #define LOG_TAG "osi_thread" #include +#include #include #include #include #include #include +#include "fixed_queue.h" +#include "reactor.h" #include "semaphore.h" #include "thread.h" -typedef struct thread_t { +struct thread_t { pthread_t pthread; pid_t tid; - char name[THREAD_NAME_MAX+1]; -} thread_t; - -pid_t thread_id(const thread_t *thread) { - assert(thread != NULL); - return thread->tid; -} - -const char *thread_name(const thread_t *thread) { - assert(thread != NULL); - return thread->name; -} + char name[THREAD_NAME_MAX + 1]; + reactor_t *reactor; + fixed_queue_t *work_queue; +}; struct start_arg { thread_t *thread; semaphore_t *start_sem; int error; - thread_start_cb start_routine; - void *arg; }; -static void *run_thread(void *start_arg) { - assert(start_arg != NULL); +typedef struct { + thread_fn func; + void *context; +} work_item_t; - struct start_arg *start = start_arg; - thread_t *thread = start->thread; +static void *run_thread(void *start_arg); +static void work_queue_read_cb(void *context); - assert(thread != NULL); +static const size_t WORK_QUEUE_CAPACITY = 128; - if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) { - ALOGE("%s unable to set thread name: %s", __func__, strerror(errno)); - start->error = errno; - semaphore_post(start->start_sem); - return NULL; - } - thread->tid = gettid(); - - // Cache local values because we are about to let thread_create - // continue - thread_start_cb start_routine = start->start_routine; - void *arg = start->arg; - - semaphore_post(start->start_sem); - return start_routine(arg); -} - -thread_t *thread_create(const char *name, - thread_start_cb start_routine, void *arg) { +thread_t *thread_new(const char *name) { assert(name != NULL); - assert(start_routine != NULL); // Start is on the stack, but we use a semaphore, so it's safe - struct start_arg start; - thread_t *ret; - ret = calloc(1, sizeof(thread_t)); + thread_t *ret = calloc(1, sizeof(thread_t)); if (!ret) goto error; + + ret->reactor = reactor_new(); + if (!ret->reactor) + goto error; + + ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY); + if (!ret->work_queue) + goto error; + + struct start_arg start; start.start_sem = semaphore_new(0); if (!start.start_sem) goto error; @@ -95,23 +79,114 @@ thread_t *thread_create(const char *name, strncpy(ret->name, name, THREAD_NAME_MAX); start.thread = ret; start.error = 0; - start.start_routine = start_routine; - start.arg = arg; pthread_create(&ret->pthread, NULL, run_thread, &start); semaphore_wait(start.start_sem); + semaphore_free(start.start_sem); if (start.error) goto error; return ret; error:; - semaphore_free(start.start_sem); + if (ret) { + fixed_queue_free(ret->work_queue, free); + reactor_free(ret->reactor); + } free(ret); return NULL; } -int thread_join(thread_t *thread, void **retval) { - int ret = pthread_join(thread->pthread, retval); - if (!ret) - free(thread); - return ret; +void thread_free(thread_t *thread) { + if (!thread) + return; + + thread_stop(thread); + pthread_join(thread->pthread, NULL); + fixed_queue_free(thread->work_queue, free); + reactor_free(thread->reactor); + free(thread); +} + +bool thread_post(thread_t *thread, thread_fn func, void *context) { + assert(thread != NULL); + assert(func != NULL); + + // TODO(sharvil): if the current thread == |thread| and we've run out + // of queue space, we should abort this operation, otherwise we'll + // deadlock. + + // Queue item is freed either when the queue itself is destroyed + // or when the item is removed from the queue for dispatch. + work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t)); + if (!item) { + ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno)); + return false; + } + item->func = func; + item->context = context; + fixed_queue_enqueue(thread->work_queue, item); + return true; +} + +void thread_stop(thread_t *thread) { + assert(thread != NULL); + reactor_stop(thread->reactor); +} + +const char *thread_name(const thread_t *thread) { + assert(thread != NULL); + return thread->name; +} + +static void *run_thread(void *start_arg) { + assert(start_arg != NULL); + + struct start_arg *start = start_arg; + thread_t *thread = start->thread; + + assert(thread != NULL); + + if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) { + ALOGE("%s unable to set thread name: %s", __func__, strerror(errno)); + start->error = errno; + semaphore_post(start->start_sem); + return NULL; + } + thread->tid = gettid(); + + semaphore_post(start->start_sem); + + reactor_object_t work_queue_object; + work_queue_object.context = thread->work_queue; + work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue); + work_queue_object.interest = REACTOR_INTEREST_READ; + work_queue_object.read_ready = work_queue_read_cb; + + reactor_register(thread->reactor, &work_queue_object); + reactor_start(thread->reactor); + + // Make sure we dispatch all queued work items before exiting the thread. + // This allows a caller to safely tear down by enqueuing a teardown + // work item and then joining the thread. + size_t count = 0; + work_item_t *item = fixed_queue_try_dequeue(thread->work_queue); + while (item && count <= WORK_QUEUE_CAPACITY) { + item->func(item->context); + free(item); + item = fixed_queue_try_dequeue(thread->work_queue); + ++count; + } + + if (count > WORK_QUEUE_CAPACITY) + ALOGD("%s growing event queue on shutdown.", __func__); + + return NULL; +} + +static void work_queue_read_cb(void *context) { + assert(context != NULL); + + fixed_queue_t *queue = (fixed_queue_t *)context; + work_item_t *item = fixed_queue_dequeue(queue); + item->func(item->context); + free(item); } diff --git a/osi/test/thread_test.cpp b/osi/test/thread_test.cpp index bf4c03c68..edbc005b4 100644 --- a/osi/test/thread_test.cpp +++ b/osi/test/thread_test.cpp @@ -5,44 +5,31 @@ extern "C" { #include "osi.h" } -void *start_routine(void *arg) -{ - return arg; -} - TEST(ThreadTest, test_new_simple) { - thread_t *thread = thread_create("test_thread", &start_routine, NULL); + thread_t *thread = thread_new("test_thread"); ASSERT_TRUE(thread != NULL); - thread_join(thread, NULL); + thread_free(thread); } -TEST(ThreadTest, test_join_simple) { - thread_t *thread = thread_create("test_thread", &start_routine, NULL); - thread_join(thread, NULL); +TEST(ThreadTest, test_free_simple) { + thread_t *thread = thread_new("test_thread"); + thread_free(thread); } TEST(ThreadTest, test_name) { - thread_t *thread = thread_create("test_name", &start_routine, NULL); + thread_t *thread = thread_new("test_name"); ASSERT_STREQ(thread_name(thread), "test_name"); - thread_join(thread, NULL); + thread_free(thread); } TEST(ThreadTest, test_long_name) { - thread_t *thread = thread_create("0123456789abcdef", &start_routine, NULL); + thread_t *thread = thread_new("0123456789abcdef"); ASSERT_STREQ("0123456789abcdef", thread_name(thread)); - thread_join(thread, NULL); + thread_free(thread); } TEST(ThreadTest, test_very_long_name) { - thread_t *thread = thread_create("0123456789abcdefg", &start_routine, NULL); + thread_t *thread = thread_new("0123456789abcdefg"); ASSERT_STREQ("0123456789abcdef", thread_name(thread)); - thread_join(thread, NULL); -} - -TEST(ThreadTest, test_return) { - int arg = 10; - void *ret; - thread_t *thread = thread_create("test", &start_routine, &arg); - thread_join(thread, &ret); - ASSERT_EQ(ret, &arg); + thread_free(thread); } -- 2.11.0