OSDN Git Service

resolved conflicts for b8cc54d1 to mnc-dr-dev-plus-aosp
[android-x86/system-bt.git] / osi / src / eager_reader.c
1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18
19 #define LOG_TAG "bt_osi_eager_reader"
20
21 #include "osi/include/eager_reader.h"
22
23 #include <assert.h>
24 #include <errno.h>
25 #include <string.h>
26 #include <sys/eventfd.h>
27 #include <unistd.h>
28
29 #include "osi/include/fixed_queue.h"
30 #include "osi/include/log.h"
31 #include "osi/include/osi.h"
32 #include "osi/include/reactor.h"
33 #include "osi/include/thread.h"
34
35 #if !defined(EFD_SEMAPHORE)
36 #  define EFD_SEMAPHORE (1 << 0)
37 #endif
38
39 typedef struct {
40   size_t length;
41   size_t offset;
42   uint8_t data[];
43 } data_buffer_t;
44
45 struct eager_reader_t {
46   int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
47   int inbound_fd;
48
49   const allocator_t *allocator;
50   size_t buffer_size;
51   fixed_queue_t *buffers;
52   data_buffer_t *current_buffer;
53
54   thread_t *inbound_read_thread;
55   reactor_object_t *inbound_read_object;
56
57   reactor_object_t *outbound_registration;
58   eager_reader_cb outbound_read_ready;
59   void *outbound_context;
60 };
61
62 static bool has_byte(const eager_reader_t *reader);
63 static void inbound_data_waiting(void *context);
64 static void internal_outbound_read_ready(void *context);
65
66 eager_reader_t *eager_reader_new(
67     int fd_to_read,
68     const allocator_t *allocator,
69     size_t buffer_size,
70     size_t max_buffer_count,
71     const char *thread_name) {
72
73   assert(fd_to_read != INVALID_FD);
74   assert(allocator != NULL);
75   assert(buffer_size > 0);
76   assert(max_buffer_count > 0);
77   assert(thread_name != NULL && *thread_name != '\0');
78
79   eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
80   if (!ret) {
81     LOG_ERROR(LOG_TAG, "%s unable to allocate memory for new eager_reader.", __func__);
82     goto error;
83   }
84
85   ret->allocator = allocator;
86   ret->inbound_fd = fd_to_read;
87
88   ret->bytes_available_fd = eventfd(0, 0);
89   if (ret->bytes_available_fd == INVALID_FD) {
90     LOG_ERROR(LOG_TAG, "%s unable to create output reading semaphore.", __func__);
91     goto error;
92   }
93
94   ret->buffer_size = buffer_size;
95
96   ret->buffers = fixed_queue_new(max_buffer_count);
97   if (!ret->buffers) {
98     LOG_ERROR(LOG_TAG, "%s unable to create buffers queue.", __func__);
99     goto error;
100   }
101
102   ret->inbound_read_thread = thread_new(thread_name);
103   if (!ret->inbound_read_thread) {
104     LOG_ERROR(LOG_TAG, "%s unable to make reading thread.", __func__);
105     goto error;
106   }
107
108   ret->inbound_read_object = reactor_register(
109     thread_get_reactor(ret->inbound_read_thread),
110     fd_to_read,
111     ret,
112     inbound_data_waiting,
113     NULL
114   );
115
116   return ret;
117
118 error:;
119   eager_reader_free(ret);
120   return NULL;
121 }
122
123 void eager_reader_free(eager_reader_t *reader) {
124   if (!reader)
125     return;
126
127   eager_reader_unregister(reader);
128
129   // Only unregister from the input if we actually did register
130   if (reader->inbound_read_object)
131     reactor_unregister(reader->inbound_read_object);
132
133   if (reader->bytes_available_fd != INVALID_FD)
134     close(reader->bytes_available_fd);
135
136   // Free the current buffer, because it's not in the queue
137   // and won't be freed below
138   if (reader->current_buffer)
139     reader->allocator->free(reader->current_buffer);
140
141   fixed_queue_free(reader->buffers, reader->allocator->free);
142   thread_free(reader->inbound_read_thread);
143   osi_free(reader);
144 }
145
146 void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
147   assert(reader != NULL);
148   assert(reactor != NULL);
149   assert(read_cb != NULL);
150
151   // Make sure the reader isn't currently registered.
152   eager_reader_unregister(reader);
153
154   reader->outbound_read_ready = read_cb;
155   reader->outbound_context = context;
156   reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
157 }
158
159 void eager_reader_unregister(eager_reader_t *reader) {
160   assert(reader != NULL);
161
162   if (reader->outbound_registration) {
163     reactor_unregister(reader->outbound_registration);
164     reader->outbound_registration = NULL;
165   }
166 }
167
168 // SEE HEADER FOR THREAD SAFETY NOTE
169 size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {
170   assert(reader != NULL);
171   assert(buffer != NULL);
172
173   // If the caller wants nonblocking behavior, poll to see if we have
174   // any bytes available before reading.
175   if (!block && !has_byte(reader))
176     return 0;
177
178   // Find out how many bytes we have available in our various buffers.
179   eventfd_t bytes_available;
180   if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
181     LOG_ERROR(LOG_TAG, "%s unable to read semaphore for output data.", __func__);
182     return 0;
183   }
184
185   if (max_size > bytes_available)
186     max_size = bytes_available;
187
188   size_t bytes_consumed = 0;
189   while (bytes_consumed < max_size) {
190     if (!reader->current_buffer)
191       reader->current_buffer = fixed_queue_dequeue(reader->buffers);
192
193     size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
194     if (bytes_to_copy > (max_size - bytes_consumed))
195       bytes_to_copy = max_size - bytes_consumed;
196
197     memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
198     bytes_consumed += bytes_to_copy;
199     reader->current_buffer->offset += bytes_to_copy;
200
201     if (reader->current_buffer->offset >= reader->current_buffer->length) {
202       reader->allocator->free(reader->current_buffer);
203       reader->current_buffer = NULL;
204     }
205   }
206
207   bytes_available -= bytes_consumed;
208   if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
209     LOG_ERROR(LOG_TAG, "%s unable to write back bytes available for output data.", __func__);
210   }
211
212   return bytes_consumed;
213 }
214
215 static bool has_byte(const eager_reader_t *reader) {
216   assert(reader != NULL);
217
218   fd_set read_fds;
219   FD_ZERO(&read_fds);
220   FD_SET(reader->bytes_available_fd, &read_fds);
221
222   // Immediate timeout
223   struct timeval timeout;
224   timeout.tv_sec = 0;
225   timeout.tv_usec = 0;
226
227   select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
228   return FD_ISSET(reader->bytes_available_fd, &read_fds);
229 }
230
231 static void inbound_data_waiting(void *context) {
232   eager_reader_t *reader = (eager_reader_t *)context;
233
234   data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
235   if (!buffer) {
236     LOG_ERROR(LOG_TAG, "%s couldn't aquire memory for inbound data buffer.", __func__);
237     return;
238   }
239
240   buffer->length = 0;
241   buffer->offset = 0;
242
243   int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
244   if (bytes_read > 0) {
245     // Save the data for later
246     buffer->length = bytes_read;
247     fixed_queue_enqueue(reader->buffers, buffer);
248
249     // Tell consumers data is available by incrementing
250     // the semaphore by the number of bytes we just read
251     eventfd_write(reader->bytes_available_fd, bytes_read);
252   } else {
253     if (bytes_read == 0)
254       LOG_WARN(LOG_TAG, "%s fd said bytes existed, but none were found.", __func__);
255     else
256       LOG_WARN(LOG_TAG, "%s unable to read from file descriptor: %s", __func__, strerror(errno));
257
258     reader->allocator->free(buffer);
259   }
260 }
261
262 static void internal_outbound_read_ready(void *context) {
263   assert(context != NULL);
264
265   eager_reader_t *reader = (eager_reader_t *)context;
266   reader->outbound_read_ready(reader, reader->outbound_context);
267 }