ret->allocator = allocator;
ret->inbound_fd = fd_to_read;
- ret->bytes_available_fd = eventfd(0, EFD_SEMAPHORE);
+ ret->bytes_available_fd = eventfd(0, 0);
if (ret->bytes_available_fd == INVALID_FD) {
LOG_ERROR("%s unable to create output reading semaphore.", __func__);
goto error;
assert(reader != NULL);
assert(buffer != NULL);
- size_t bytes_read = 0;
+ // If the caller wants nonblocking behavior, poll to see if we have
+ // any bytes available before reading.
+ if (!block && !has_byte(reader))
+ return 0;
- while (bytes_read < max_size) {
- if (!block && !has_byte(reader))
- return bytes_read;
+ // Find out how many bytes we have available in our various buffers.
+ eventfd_t bytes_available;
+ if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
+ LOG_ERROR("%s unable to read semaphore for output data.", __func__);
+ return 0;
+ }
- eventfd_t value;
- if (eventfd_read(reader->bytes_available_fd, &value) == -1)
- LOG_ERROR("%s unable to read semaphore for output data.", __func__);
+ if (max_size > bytes_available)
+ max_size = bytes_available;
+ size_t bytes_consumed = 0;
+ while (bytes_consumed < max_size) {
if (!reader->current_buffer)
reader->current_buffer = fixed_queue_dequeue(reader->buffers);
- buffer[bytes_read] = reader->current_buffer->data[reader->current_buffer->offset];
- reader->current_buffer->offset++;
- bytes_read++;
+ size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
+ if (bytes_to_copy > (max_size - bytes_consumed))
+ bytes_to_copy = max_size - bytes_consumed;
+
+ memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
+ bytes_consumed += bytes_to_copy;
+ reader->current_buffer->offset += bytes_to_copy;
- // Prep for next byte
if (reader->current_buffer->offset >= reader->current_buffer->length) {
reader->allocator->free(reader->current_buffer);
reader->current_buffer = NULL;
}
}
- return bytes_read;
+ bytes_available -= bytes_consumed;
+ if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
+ LOG_ERROR("%s unable to write back bytes available for output data.", __func__);
+ }
+
+ return bytes_consumed;
}
static bool has_byte(const eager_reader_t *reader) {