#define THREAD_NAME_MAX 16
-struct thread_t;
typedef struct thread_t thread_t;
+typedef void (*thread_fn)(void *context);
-typedef void *(*thread_start_cb) (void *);
+// Creates and starts a new thread with the given name. Only THREAD_NAME_MAX
+// bytes from |name| will be assigned to the newly-created thread. Returns a
+// thread object if the thread was successfully started, NULL otherwise. The
+// returned thread object must be freed with |thread_free|. |name| may not
+// be NULL.
+thread_t *thread_new(const char *name);
-// Lifecycle
-thread_t *thread_create(const char *name,
- thread_start_cb start_routine, void *arg);
-int thread_join(thread_t *thread, void **retval);
+// Frees the given |thread|. If the thread is still running, it is stopped
+// and the calling thread will block until |thread| terminates. |thread|
+// may be NULL.
+void thread_free(thread_t *thread);
-// Query
-pid_t thread_id(const thread_t *thread);
+// Call |func| with the argument |context| on |thread|. This function typically
+// does not block unless there are an excessive number of functions posted to
+// |thread| that have not been dispatched yet. Neither |thread| nor |func| may
+// be NULL. |context| may be NULL.
+bool thread_post(thread_t *thread, thread_fn func, void *context);
+
+// Requests |thread| to stop. Only |thread_free| and |thread_name| may be called
+// after calling |thread_stop|. This function is guaranteed to not block.
+// |thread| may not be NULL.
+void thread_stop(thread_t *thread);
+
+// Returns the name of the given |thread|. |thread| may not be NULL.
const char *thread_name(const thread_t *thread);
#define LOG_TAG "osi_thread"
#include <assert.h>
+#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/prctl.h>
#include <sys/types.h>
#include <utils/Log.h>
+#include "fixed_queue.h"
+#include "reactor.h"
#include "semaphore.h"
#include "thread.h"
-typedef struct thread_t {
+struct thread_t {
pthread_t pthread;
pid_t tid;
- char name[THREAD_NAME_MAX+1];
-} thread_t;
-
-pid_t thread_id(const thread_t *thread) {
- assert(thread != NULL);
- return thread->tid;
-}
-
-const char *thread_name(const thread_t *thread) {
- assert(thread != NULL);
- return thread->name;
-}
+ char name[THREAD_NAME_MAX + 1];
+ reactor_t *reactor;
+ fixed_queue_t *work_queue;
+};
struct start_arg {
thread_t *thread;
semaphore_t *start_sem;
int error;
- thread_start_cb start_routine;
- void *arg;
};
-static void *run_thread(void *start_arg) {
- assert(start_arg != NULL);
+typedef struct {
+ thread_fn func;
+ void *context;
+} work_item_t;
- struct start_arg *start = start_arg;
- thread_t *thread = start->thread;
+static void *run_thread(void *start_arg);
+static void work_queue_read_cb(void *context);
- assert(thread != NULL);
+static const size_t WORK_QUEUE_CAPACITY = 128;
- if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
- ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
- start->error = errno;
- semaphore_post(start->start_sem);
- return NULL;
- }
- thread->tid = gettid();
-
- // Cache local values because we are about to let thread_create
- // continue
- thread_start_cb start_routine = start->start_routine;
- void *arg = start->arg;
-
- semaphore_post(start->start_sem);
- return start_routine(arg);
-}
-
-thread_t *thread_create(const char *name,
- thread_start_cb start_routine, void *arg) {
+thread_t *thread_new(const char *name) {
assert(name != NULL);
- assert(start_routine != NULL);
// Start is on the stack, but we use a semaphore, so it's safe
- struct start_arg start;
- thread_t *ret;
- ret = calloc(1, sizeof(thread_t));
+ thread_t *ret = calloc(1, sizeof(thread_t));
if (!ret)
goto error;
+
+ ret->reactor = reactor_new();
+ if (!ret->reactor)
+ goto error;
+
+ ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY);
+ if (!ret->work_queue)
+ goto error;
+
+ struct start_arg start;
start.start_sem = semaphore_new(0);
if (!start.start_sem)
goto error;
strncpy(ret->name, name, THREAD_NAME_MAX);
start.thread = ret;
start.error = 0;
- start.start_routine = start_routine;
- start.arg = arg;
pthread_create(&ret->pthread, NULL, run_thread, &start);
semaphore_wait(start.start_sem);
+ semaphore_free(start.start_sem);
if (start.error)
goto error;
return ret;
error:;
- semaphore_free(start.start_sem);
+ if (ret) {
+ fixed_queue_free(ret->work_queue, free);
+ reactor_free(ret->reactor);
+ }
free(ret);
return NULL;
}
-int thread_join(thread_t *thread, void **retval) {
- int ret = pthread_join(thread->pthread, retval);
- if (!ret)
- free(thread);
- return ret;
+void thread_free(thread_t *thread) {
+ if (!thread)
+ return;
+
+ thread_stop(thread);
+ pthread_join(thread->pthread, NULL);
+ fixed_queue_free(thread->work_queue, free);
+ reactor_free(thread->reactor);
+ free(thread);
+}
+
+bool thread_post(thread_t *thread, thread_fn func, void *context) {
+ assert(thread != NULL);
+ assert(func != NULL);
+
+ // TODO(sharvil): if the current thread == |thread| and we've run out
+ // of queue space, we should abort this operation, otherwise we'll
+ // deadlock.
+
+ // Queue item is freed either when the queue itself is destroyed
+ // or when the item is removed from the queue for dispatch.
+ work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t));
+ if (!item) {
+ ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno));
+ return false;
+ }
+ item->func = func;
+ item->context = context;
+ fixed_queue_enqueue(thread->work_queue, item);
+ return true;
+}
+
+void thread_stop(thread_t *thread) {
+ assert(thread != NULL);
+ reactor_stop(thread->reactor);
+}
+
+const char *thread_name(const thread_t *thread) {
+ assert(thread != NULL);
+ return thread->name;
+}
+
+static void *run_thread(void *start_arg) {
+ assert(start_arg != NULL);
+
+ struct start_arg *start = start_arg;
+ thread_t *thread = start->thread;
+
+ assert(thread != NULL);
+
+ if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
+ ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
+ start->error = errno;
+ semaphore_post(start->start_sem);
+ return NULL;
+ }
+ thread->tid = gettid();
+
+ semaphore_post(start->start_sem);
+
+ reactor_object_t work_queue_object;
+ work_queue_object.context = thread->work_queue;
+ work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
+ work_queue_object.interest = REACTOR_INTEREST_READ;
+ work_queue_object.read_ready = work_queue_read_cb;
+
+ reactor_register(thread->reactor, &work_queue_object);
+ reactor_start(thread->reactor);
+
+ // Make sure we dispatch all queued work items before exiting the thread.
+ // This allows a caller to safely tear down by enqueuing a teardown
+ // work item and then joining the thread.
+ size_t count = 0;
+ work_item_t *item = fixed_queue_try_dequeue(thread->work_queue);
+ while (item && count <= WORK_QUEUE_CAPACITY) {
+ item->func(item->context);
+ free(item);
+ item = fixed_queue_try_dequeue(thread->work_queue);
+ ++count;
+ }
+
+ if (count > WORK_QUEUE_CAPACITY)
+ ALOGD("%s growing event queue on shutdown.", __func__);
+
+ return NULL;
+}
+
+static void work_queue_read_cb(void *context) {
+ assert(context != NULL);
+
+ fixed_queue_t *queue = (fixed_queue_t *)context;
+ work_item_t *item = fixed_queue_dequeue(queue);
+ item->func(item->context);
+ free(item);
}
#include "osi.h"
}
-void *start_routine(void *arg)
-{
- return arg;
-}
-
TEST(ThreadTest, test_new_simple) {
- thread_t *thread = thread_create("test_thread", &start_routine, NULL);
+ thread_t *thread = thread_new("test_thread");
ASSERT_TRUE(thread != NULL);
- thread_join(thread, NULL);
+ thread_free(thread);
}
-TEST(ThreadTest, test_join_simple) {
- thread_t *thread = thread_create("test_thread", &start_routine, NULL);
- thread_join(thread, NULL);
+TEST(ThreadTest, test_free_simple) {
+ thread_t *thread = thread_new("test_thread");
+ thread_free(thread);
}
TEST(ThreadTest, test_name) {
- thread_t *thread = thread_create("test_name", &start_routine, NULL);
+ thread_t *thread = thread_new("test_name");
ASSERT_STREQ(thread_name(thread), "test_name");
- thread_join(thread, NULL);
+ thread_free(thread);
}
TEST(ThreadTest, test_long_name) {
- thread_t *thread = thread_create("0123456789abcdef", &start_routine, NULL);
+ thread_t *thread = thread_new("0123456789abcdef");
ASSERT_STREQ("0123456789abcdef", thread_name(thread));
- thread_join(thread, NULL);
+ thread_free(thread);
}
TEST(ThreadTest, test_very_long_name) {
- thread_t *thread = thread_create("0123456789abcdefg", &start_routine, NULL);
+ thread_t *thread = thread_new("0123456789abcdefg");
ASSERT_STREQ("0123456789abcdef", thread_name(thread));
- thread_join(thread, NULL);
-}
-
-TEST(ThreadTest, test_return) {
- int arg = 10;
- void *ret;
- thread_t *thread = thread_create("test", &start_routine, &arg);
- thread_join(thread, &ret);
- ASSERT_EQ(ret, &arg);
+ thread_free(thread);
}