OSDN Git Service

Implement a fixed queue.
authorSharvil Nanavati <sharvil@google.com>
Sat, 3 May 2014 06:55:09 +0000 (23:55 -0700)
committerPrerepa Viswanadham <dham@google.com>
Thu, 12 Jun 2014 01:59:45 +0000 (18:59 -0700)
Change-Id: Ifad7605d0b6e1a57f4767f9de1bed7e99284ded7

osi/Android.mk
osi/include/fixed_queue.h [new file with mode: 0644]
osi/include/semaphore.h [new file with mode: 0644]
osi/src/fixed_queue.c [new file with mode: 0644]
osi/src/semaphore.c [new file with mode: 0644]

index e221ed1..c63e266 100644 (file)
@@ -6,7 +6,9 @@ LOCAL_C_INCLUDES := \
     $(LOCAL_PATH)/include
 
 LOCAL_SRC_FILES := \
-    ./src/list.c
+    ./src/fixed_queue.c \
+    ./src/list.c \
+    ./src/semaphore.c
 
 LOCAL_CFLAGS := -std=c99 -Wall -Werror
 LOCAL_MODULE := libosi
diff --git a/osi/include/fixed_queue.h b/osi/include/fixed_queue.h
new file mode 100644 (file)
index 0000000..5c4f9c4
--- /dev/null
@@ -0,0 +1,17 @@
+#pragma once
+
+#include "list.h"
+
+struct fixed_queue_t;
+typedef struct fixed_queue_t fixed_queue_t;
+
+typedef void (*fixed_queue_free_cb)(void *data);
+
+fixed_queue_t *fixed_queue_new(size_t capacity);
+
+// Freeing a queue that is currently in use (i.e. has waiters
+// blocked on it) resuls in undefined behaviour.
+void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb);
+
+void fixed_queue_enqueue(fixed_queue_t *queue, void *data);
+void *fixed_queue_dequeue(fixed_queue_t *queue);
diff --git a/osi/include/semaphore.h b/osi/include/semaphore.h
new file mode 100644 (file)
index 0000000..3c8b13a
--- /dev/null
@@ -0,0 +1,10 @@
+#pragma once
+
+struct semaphore_t;
+typedef struct semaphore_t semaphore_t;
+
+semaphore_t *semaphore_new(unsigned int value);
+void semaphore_free(semaphore_t *semaphore);
+
+void semaphore_wait(semaphore_t *semaphore);
+void semaphore_post(semaphore_t *semaphore);
diff --git a/osi/src/fixed_queue.c b/osi/src/fixed_queue.c
new file mode 100644 (file)
index 0000000..76cf603
--- /dev/null
@@ -0,0 +1,94 @@
+#include <assert.h>
+#include <pthread.h>
+#include <stdlib.h>
+
+#include "fixed_queue.h"
+#include "list.h"
+#include "osi.h"
+#include "semaphore.h"
+
+typedef struct fixed_queue_t {
+  list_t *list;
+  semaphore_t *enqueue_sem;
+  semaphore_t *dequeue_sem;
+  pthread_mutex_t lock;
+  size_t capacity;
+} fixed_queue_t;
+
+fixed_queue_t *fixed_queue_new(size_t capacity) {
+  fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
+  if (!ret)
+    goto error;
+
+  ret->list = list_new(NULL);
+  if (!ret->list)
+    goto error;
+
+  ret->enqueue_sem = semaphore_new(capacity);
+  if (!ret->enqueue_sem)
+    goto error;
+
+  ret->dequeue_sem = semaphore_new(0);
+  if (!ret->dequeue_sem)
+    goto error;
+
+  pthread_mutex_init(&ret->lock, NULL);
+  ret->capacity = capacity;
+
+  return ret;
+
+error:;
+  if (ret) {
+    list_free(ret->list);
+    semaphore_free(ret->enqueue_sem);
+    semaphore_free(ret->dequeue_sem);
+  }
+
+  free(ret);
+  return NULL;
+}
+
+void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
+  if (!queue)
+    return;
+
+  if (free_cb)
+    for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
+      free_cb(list_node(node));
+
+  list_free(queue->list);
+  semaphore_free(queue->enqueue_sem);
+  semaphore_free(queue->dequeue_sem);
+  pthread_mutex_destroy(&queue->lock);
+  free(queue);
+}
+
+void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
+  assert(queue != NULL);
+  assert(data != NULL);
+
+  semaphore_wait(queue->enqueue_sem);
+
+  pthread_mutex_lock(&queue->lock);
+  list_append(queue->list, data);
+  pthread_mutex_unlock(&queue->lock);
+
+  semaphore_post(queue->dequeue_sem);
+}
+
+void *fixed_queue_dequeue(fixed_queue_t *queue) {
+  assert(queue != NULL);
+
+  void *ret;
+
+  semaphore_wait(queue->dequeue_sem);
+
+  pthread_mutex_lock(&queue->lock);
+  ret = list_front(queue->list);
+  list_remove(queue->list, ret);
+  pthread_mutex_unlock(&queue->lock);
+
+  semaphore_post(queue->enqueue_sem);
+
+  return ret;
+}
diff --git a/osi/src/semaphore.c b/osi/src/semaphore.c
new file mode 100644 (file)
index 0000000..a46c435
--- /dev/null
@@ -0,0 +1,53 @@
+#define LOG_TAG "osi_semaphore"
+
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/eventfd.h>
+#include <utils/Log.h>
+
+#include "semaphore.h"
+
+#if !defined(EFD_SEMAPHORE)
+#  define EFD_SEMAPHORE (1 << 0)
+#endif
+
+struct semaphore_t {
+  int fd;
+};
+
+semaphore_t *semaphore_new(unsigned int value) {
+  semaphore_t *ret = malloc(sizeof(semaphore_t));
+  if (ret) {
+    ret->fd = eventfd(value, EFD_SEMAPHORE);
+    if (ret->fd == -1) {
+      ALOGE("%s unable to allocate semaphore: %s", __func__, strerror(errno));
+      free(ret);
+      ret = NULL;
+    }
+  }
+  return ret;
+}
+
+void semaphore_free(semaphore_t *semaphore) {
+  if (semaphore->fd != -1)
+    close(semaphore->fd);
+  free(semaphore);
+}
+
+void semaphore_wait(semaphore_t *semaphore) {
+  assert(semaphore != NULL);
+  assert(semaphore->fd != -1);
+
+  uint64_t value;
+  if (eventfd_read(semaphore->fd, &value) == -1)
+    ALOGE("%s unable to wait on semaphore: %s", __func__, strerror(errno));
+}
+
+void semaphore_post(semaphore_t *semaphore) {
+  assert(semaphore != NULL);
+  assert(semaphore->fd != -1);
+
+  if (eventfd_write(semaphore->fd, 1ULL) == -1)
+    ALOGE("%s unable to post to semaphore: %s", __func__, strerror(errno));
+}