1 /******************************************************************************
3 * Copyright (C) 2014 Google, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 ******************************************************************************/
23 #include "osi/include/allocator.h"
24 #include "osi/include/fixed_queue.h"
25 #include "osi/include/list.h"
26 #include "osi/include/osi.h"
27 #include "osi/include/semaphore.h"
28 #include "osi/include/reactor.h"
30 typedef struct fixed_queue_t {
32 semaphore_t *enqueue_sem;
33 semaphore_t *dequeue_sem;
37 reactor_object_t *dequeue_object;
38 fixed_queue_cb dequeue_ready;
39 void *dequeue_context;
42 static void internal_dequeue_ready(void *context);
44 fixed_queue_t *fixed_queue_new(size_t capacity) {
45 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
49 pthread_mutex_init(&ret->lock, NULL);
50 ret->capacity = capacity;
52 ret->list = list_new(NULL);
56 ret->enqueue_sem = semaphore_new(capacity);
57 if (!ret->enqueue_sem)
60 ret->dequeue_sem = semaphore_new(0);
61 if (!ret->dequeue_sem)
67 fixed_queue_free(ret, NULL);
71 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
75 fixed_queue_unregister_dequeue(queue);
78 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
79 free_cb(list_node(node));
81 list_free(queue->list);
82 semaphore_free(queue->enqueue_sem);
83 semaphore_free(queue->dequeue_sem);
84 pthread_mutex_destroy(&queue->lock);
88 bool fixed_queue_is_empty(fixed_queue_t *queue) {
89 assert(queue != NULL);
91 pthread_mutex_lock(&queue->lock);
92 bool is_empty = list_is_empty(queue->list);
93 pthread_mutex_unlock(&queue->lock);
98 size_t fixed_queue_capacity(fixed_queue_t *queue) {
99 assert(queue != NULL);
101 return queue->capacity;
104 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
105 assert(queue != NULL);
106 assert(data != NULL);
108 semaphore_wait(queue->enqueue_sem);
110 pthread_mutex_lock(&queue->lock);
111 list_append(queue->list, data);
112 pthread_mutex_unlock(&queue->lock);
114 semaphore_post(queue->dequeue_sem);
117 void *fixed_queue_dequeue(fixed_queue_t *queue) {
118 assert(queue != NULL);
120 semaphore_wait(queue->dequeue_sem);
122 pthread_mutex_lock(&queue->lock);
123 void *ret = list_front(queue->list);
124 list_remove(queue->list, ret);
125 pthread_mutex_unlock(&queue->lock);
127 semaphore_post(queue->enqueue_sem);
132 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
133 assert(queue != NULL);
134 assert(data != NULL);
136 if (!semaphore_try_wait(queue->enqueue_sem))
139 pthread_mutex_lock(&queue->lock);
140 list_append(queue->list, data);
141 pthread_mutex_unlock(&queue->lock);
143 semaphore_post(queue->dequeue_sem);
147 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
148 assert(queue != NULL);
150 if (!semaphore_try_wait(queue->dequeue_sem))
153 pthread_mutex_lock(&queue->lock);
154 void *ret = list_front(queue->list);
155 list_remove(queue->list, ret);
156 pthread_mutex_unlock(&queue->lock);
158 semaphore_post(queue->enqueue_sem);
163 void *fixed_queue_try_peek(fixed_queue_t *queue) {
164 assert(queue != NULL);
166 pthread_mutex_lock(&queue->lock);
167 // Because protected by the lock, the empty and front calls are atomic and not a race condition
168 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
169 pthread_mutex_unlock(&queue->lock);
174 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
175 assert(queue != NULL);
176 return semaphore_get_fd(queue->dequeue_sem);
179 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
180 assert(queue != NULL);
181 return semaphore_get_fd(queue->enqueue_sem);
184 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
185 assert(queue != NULL);
186 assert(reactor != NULL);
187 assert(ready_cb != NULL);
189 // Make sure we're not already registered
190 fixed_queue_unregister_dequeue(queue);
192 queue->dequeue_ready = ready_cb;
193 queue->dequeue_context = context;
194 queue->dequeue_object = reactor_register(
196 fixed_queue_get_dequeue_fd(queue),
198 internal_dequeue_ready,
203 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
204 assert(queue != NULL);
206 if (queue->dequeue_object) {
207 reactor_unregister(queue->dequeue_object);
208 queue->dequeue_object = NULL;
212 static void internal_dequeue_ready(void *context) {
213 assert(context != NULL);
215 fixed_queue_t *queue = context;
216 queue->dequeue_ready(queue, queue->dequeue_context);