1 /******************************************************************************
3 * Copyright (C) 2014 Google, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 ******************************************************************************/
19 #define LOG_TAG "bt_osi_reactor"
25 #include <sys/epoll.h>
26 #include <sys/eventfd.h>
28 #include "osi/include/allocator.h"
29 #include "osi/include/list.h"
30 #include "osi/include/log.h"
31 #include "osi/include/reactor.h"
33 #if !defined(EFD_SEMAPHORE)
34 # define EFD_SEMAPHORE (1 << 0)
40 pthread_mutex_t list_lock; // protects invalidation_list.
41 list_t *invalidation_list; // reactor objects that have been unregistered.
42 pthread_t run_thread; // the pthread on which reactor_run is executing.
43 bool is_running; // indicates whether |run_thread| is valid.
47 struct reactor_object_t {
48 int fd; // the file descriptor to monitor for events.
49 void *context; // a context that's passed back to the *_ready functions.
50 reactor_t *reactor; // the reactor instance this object is registered with.
51 pthread_mutex_t lock; // protects the lifetime of this object and all variables.
53 void (*read_ready)(void *context); // function to call when the file descriptor becomes readable.
54 void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable.
57 static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
59 static const size_t MAX_EVENTS = 64;
60 static const eventfd_t EVENT_REACTOR_STOP = 1;
62 reactor_t *reactor_new(void) {
63 reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));
67 ret->epoll_fd = INVALID_FD;
68 ret->event_fd = INVALID_FD;
70 ret->epoll_fd = epoll_create(MAX_EVENTS);
71 if (ret->epoll_fd == INVALID_FD) {
72 LOG_ERROR("%s unable to create epoll instance: %s", __func__, strerror(errno));
76 ret->event_fd = eventfd(0, 0);
77 if (ret->event_fd == INVALID_FD) {
78 LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno));
82 pthread_mutex_init(&ret->list_lock, NULL);
83 ret->invalidation_list = list_new(NULL);
84 if (!ret->invalidation_list) {
85 LOG_ERROR("%s unable to allocate object invalidation list.", __func__);
89 struct epoll_event event = { 0 };
90 event.events = EPOLLIN;
91 event.data.ptr = NULL;
92 if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
93 LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
104 void reactor_free(reactor_t *reactor) {
108 list_free(reactor->invalidation_list);
109 close(reactor->event_fd);
110 close(reactor->epoll_fd);
114 reactor_status_t reactor_start(reactor_t *reactor) {
115 assert(reactor != NULL);
116 return run_reactor(reactor, 0);
119 reactor_status_t reactor_run_once(reactor_t *reactor) {
120 assert(reactor != NULL);
121 return run_reactor(reactor, 1);
124 void reactor_stop(reactor_t *reactor) {
125 assert(reactor != NULL);
127 eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
130 reactor_object_t *reactor_register(reactor_t *reactor,
131 int fd, void *context,
132 void (*read_ready)(void *context),
133 void (*write_ready)(void *context)) {
134 assert(reactor != NULL);
135 assert(fd != INVALID_FD);
137 reactor_object_t *object = (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
139 LOG_ERROR("%s unable to allocate reactor object: %s", __func__, strerror(errno));
143 object->reactor = reactor;
145 object->context = context;
146 object->read_ready = read_ready;
147 object->write_ready = write_ready;
148 pthread_mutex_init(&object->lock, NULL);
150 struct epoll_event event = { 0 };
153 event.events |= (EPOLLIN | EPOLLRDHUP);
155 event.events |= EPOLLOUT;
156 event.data.ptr = object;
158 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
159 LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
160 pthread_mutex_destroy(&object->lock);
168 bool reactor_change_registration(reactor_object_t *object,
169 void (*read_ready)(void *context),
170 void (*write_ready)(void *context)) {
171 assert(object != NULL);
173 struct epoll_event event = { 0 };
176 event.events |= (EPOLLIN | EPOLLRDHUP);
178 event.events |= EPOLLOUT;
179 event.data.ptr = object;
181 if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
182 LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
186 pthread_mutex_lock(&object->lock);
187 object->read_ready = read_ready;
188 object->write_ready = write_ready;
189 pthread_mutex_unlock(&object->lock);
194 void reactor_unregister(reactor_object_t *obj) {
197 reactor_t *reactor = obj->reactor;
199 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
200 LOG_ERROR("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
202 if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
203 reactor->object_removed = true;
207 pthread_mutex_lock(&reactor->list_lock);
208 list_append(reactor->invalidation_list, obj);
209 pthread_mutex_unlock(&reactor->list_lock);
211 // Taking the object lock here makes sure a callback for |obj| isn't
212 // currently executing. The reactor thread must then either be before
213 // the callbacks or after. If after, we know that the object won't be
214 // referenced because it has been taken out of the epoll set. If before,
215 // it won't be referenced because the reactor thread will check the
216 // invalidation_list and find it in there. So by taking this lock, we
217 // are waiting until the reactor thread drops all references to |obj|.
218 // One the wait completes, we can unlock and destroy |obj| safely.
219 pthread_mutex_lock(&obj->lock);
220 pthread_mutex_unlock(&obj->lock);
221 pthread_mutex_destroy(&obj->lock);
225 // Runs the reactor loop for a maximum of |iterations|.
226 // 0 |iterations| means loop forever.
227 // |reactor| may not be NULL.
228 static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
229 assert(reactor != NULL);
231 reactor->run_thread = pthread_self();
232 reactor->is_running = true;
234 struct epoll_event events[MAX_EVENTS];
235 for (int i = 0; iterations == 0 || i < iterations; ++i) {
236 pthread_mutex_lock(&reactor->list_lock);
237 list_clear(reactor->invalidation_list);
238 pthread_mutex_unlock(&reactor->list_lock);
242 ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);
243 } while (ret == -1 && errno == EINTR);
246 LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno));
247 reactor->is_running = false;
248 return REACTOR_STATUS_ERROR;
251 for (int j = 0; j < ret; ++j) {
252 // The event file descriptor is the only one that registers with
253 // a NULL data pointer. We use the NULL to identify it and break
254 // out of the reactor loop.
255 if (events[j].data.ptr == NULL) {
257 eventfd_read(reactor->event_fd, &value);
258 reactor->is_running = false;
259 return REACTOR_STATUS_STOP;
262 reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
264 pthread_mutex_lock(&reactor->list_lock);
265 if (list_contains(reactor->invalidation_list, object)) {
266 pthread_mutex_unlock(&reactor->list_lock);
270 // Downgrade the list lock to an object lock.
271 pthread_mutex_lock(&object->lock);
272 pthread_mutex_unlock(&reactor->list_lock);
274 reactor->object_removed = false;
275 if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
276 object->read_ready(object->context);
277 if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
278 object->write_ready(object->context);
279 pthread_mutex_unlock(&object->lock);
281 if (reactor->object_removed) {
282 pthread_mutex_destroy(&object->lock);
288 reactor->is_running = false;
289 return REACTOR_STATUS_DONE;