From c11b407e78d96e05b0991bbe6dfde0d7eb5349b5 Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Fri, 2 May 2014 23:55:09 -0700 Subject: [PATCH] Implement a fixed queue. Change-Id: Ifad7605d0b6e1a57f4767f9de1bed7e99284ded7 --- osi/Android.mk | 4 +- osi/include/fixed_queue.h | 17 +++++++++ osi/include/semaphore.h | 10 +++++ osi/src/fixed_queue.c | 94 +++++++++++++++++++++++++++++++++++++++++++++++ osi/src/semaphore.c | 53 ++++++++++++++++++++++++++ 5 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 osi/include/fixed_queue.h create mode 100644 osi/include/semaphore.h create mode 100644 osi/src/fixed_queue.c create mode 100644 osi/src/semaphore.c diff --git a/osi/Android.mk b/osi/Android.mk index e221ed137..c63e26658 100644 --- a/osi/Android.mk +++ b/osi/Android.mk @@ -6,7 +6,9 @@ LOCAL_C_INCLUDES := \ $(LOCAL_PATH)/include LOCAL_SRC_FILES := \ - ./src/list.c + ./src/fixed_queue.c \ + ./src/list.c \ + ./src/semaphore.c LOCAL_CFLAGS := -std=c99 -Wall -Werror LOCAL_MODULE := libosi diff --git a/osi/include/fixed_queue.h b/osi/include/fixed_queue.h new file mode 100644 index 000000000..5c4f9c460 --- /dev/null +++ b/osi/include/fixed_queue.h @@ -0,0 +1,17 @@ +#pragma once + +#include "list.h" + +struct fixed_queue_t; +typedef struct fixed_queue_t fixed_queue_t; + +typedef void (*fixed_queue_free_cb)(void *data); + +fixed_queue_t *fixed_queue_new(size_t capacity); + +// Freeing a queue that is currently in use (i.e. has waiters +// blocked on it) resuls in undefined behaviour. +void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb); + +void fixed_queue_enqueue(fixed_queue_t *queue, void *data); +void *fixed_queue_dequeue(fixed_queue_t *queue); diff --git a/osi/include/semaphore.h b/osi/include/semaphore.h new file mode 100644 index 000000000..3c8b13a5f --- /dev/null +++ b/osi/include/semaphore.h @@ -0,0 +1,10 @@ +#pragma once + +struct semaphore_t; +typedef struct semaphore_t semaphore_t; + +semaphore_t *semaphore_new(unsigned int value); +void semaphore_free(semaphore_t *semaphore); + +void semaphore_wait(semaphore_t *semaphore); +void semaphore_post(semaphore_t *semaphore); diff --git a/osi/src/fixed_queue.c b/osi/src/fixed_queue.c new file mode 100644 index 000000000..76cf603ff --- /dev/null +++ b/osi/src/fixed_queue.c @@ -0,0 +1,94 @@ +#include +#include +#include + +#include "fixed_queue.h" +#include "list.h" +#include "osi.h" +#include "semaphore.h" + +typedef struct fixed_queue_t { + list_t *list; + semaphore_t *enqueue_sem; + semaphore_t *dequeue_sem; + pthread_mutex_t lock; + size_t capacity; +} fixed_queue_t; + +fixed_queue_t *fixed_queue_new(size_t capacity) { + fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t)); + if (!ret) + goto error; + + ret->list = list_new(NULL); + if (!ret->list) + goto error; + + ret->enqueue_sem = semaphore_new(capacity); + if (!ret->enqueue_sem) + goto error; + + ret->dequeue_sem = semaphore_new(0); + if (!ret->dequeue_sem) + goto error; + + pthread_mutex_init(&ret->lock, NULL); + ret->capacity = capacity; + + return ret; + +error:; + if (ret) { + list_free(ret->list); + semaphore_free(ret->enqueue_sem); + semaphore_free(ret->dequeue_sem); + } + + free(ret); + return NULL; +} + +void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) { + if (!queue) + return; + + if (free_cb) + for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node)) + free_cb(list_node(node)); + + list_free(queue->list); + semaphore_free(queue->enqueue_sem); + semaphore_free(queue->dequeue_sem); + pthread_mutex_destroy(&queue->lock); + free(queue); +} + +void fixed_queue_enqueue(fixed_queue_t *queue, void *data) { + assert(queue != NULL); + assert(data != NULL); + + semaphore_wait(queue->enqueue_sem); + + pthread_mutex_lock(&queue->lock); + list_append(queue->list, data); + pthread_mutex_unlock(&queue->lock); + + semaphore_post(queue->dequeue_sem); +} + +void *fixed_queue_dequeue(fixed_queue_t *queue) { + assert(queue != NULL); + + void *ret; + + semaphore_wait(queue->dequeue_sem); + + pthread_mutex_lock(&queue->lock); + ret = list_front(queue->list); + list_remove(queue->list, ret); + pthread_mutex_unlock(&queue->lock); + + semaphore_post(queue->enqueue_sem); + + return ret; +} diff --git a/osi/src/semaphore.c b/osi/src/semaphore.c new file mode 100644 index 000000000..a46c4358b --- /dev/null +++ b/osi/src/semaphore.c @@ -0,0 +1,53 @@ +#define LOG_TAG "osi_semaphore" + +#include +#include +#include +#include +#include + +#include "semaphore.h" + +#if !defined(EFD_SEMAPHORE) +# define EFD_SEMAPHORE (1 << 0) +#endif + +struct semaphore_t { + int fd; +}; + +semaphore_t *semaphore_new(unsigned int value) { + semaphore_t *ret = malloc(sizeof(semaphore_t)); + if (ret) { + ret->fd = eventfd(value, EFD_SEMAPHORE); + if (ret->fd == -1) { + ALOGE("%s unable to allocate semaphore: %s", __func__, strerror(errno)); + free(ret); + ret = NULL; + } + } + return ret; +} + +void semaphore_free(semaphore_t *semaphore) { + if (semaphore->fd != -1) + close(semaphore->fd); + free(semaphore); +} + +void semaphore_wait(semaphore_t *semaphore) { + assert(semaphore != NULL); + assert(semaphore->fd != -1); + + uint64_t value; + if (eventfd_read(semaphore->fd, &value) == -1) + ALOGE("%s unable to wait on semaphore: %s", __func__, strerror(errno)); +} + +void semaphore_post(semaphore_t *semaphore) { + assert(semaphore != NULL); + assert(semaphore->fd != -1); + + if (eventfd_write(semaphore->fd, 1ULL) == -1) + ALOGE("%s unable to post to semaphore: %s", __func__, strerror(errno)); +} -- 2.11.0