#include <inttypes.h>
#include <log/log.h>
#include <sys/epoll.h>
+#include <poll.h>
#include <array>
#include <pdx/file_handle.h>
#include <private/dvr/bufferhub_rpc.h>
+#define RETRY_EINTR(fnc_call) \
+ ([&]() -> decltype(fnc_call) { \
+ decltype(fnc_call) result; \
+ do { \
+ result = (fnc_call); \
+ } while (result == -1 && errno == EINTR); \
+ return result; \
+ })()
+
using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
using android::pdx::Status;
// Loop at least once to check for hangups.
do {
- ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
- count(), capacity());
+ ALOGD_IF(
+ TRACE,
+ "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
+ id(), count(), capacity());
// If there is already a buffer then just check for hangup without waiting.
const int ret = epoll_fd_.Wait(events.data(), events.size(),
if (ret == 0) {
ALOGI_IF(TRACE,
- "BufferHubQueue::WaitForBuffers: No events before timeout.");
+ "BufferHubQueue::WaitForBuffers: No events before timeout: "
+ "queue_id=%d",
+ id());
return count() != 0;
}
index);
if (is_buffer_event_index(index)) {
- HandleBufferEvent(static_cast<size_t>(index), events[i]);
+ HandleBufferEvent(static_cast<size_t>(index), events[i].events);
} else if (is_queue_event_index(index)) {
- HandleQueueEvent(events[i]);
+ HandleQueueEvent(events[i].events);
} else {
ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
index);
return count() != 0;
}
-void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
+void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
auto buffer = buffers_[slot];
if (!buffer) {
ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
return;
}
- auto status = buffer->GetEventMask(event.events);
+ auto status = buffer->GetEventMask(poll_events);
if (!status) {
ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
return;
}
- int events = status.get();
+ const int events = status.get();
if (events & EPOLLIN) {
- int ret = OnBufferReady(buffer, &fences_[slot]);
- if (ret < 0) {
- ALOGE("BufferHubQueue::HandleBufferEvent: Failed to set buffer ready: %s",
- strerror(-ret));
- return;
+ const int ret = OnBufferReady(buffer, &fences_[slot]);
+ if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
+ // Only enqueue the buffer if it moves to or is already in the state
+ // requested in OnBufferReady(). If the buffer is busy this means that the
+ // buffer moved from released to posted when a new consumer was created
+ // before the ProducerQueue had a chance to regain it. This is a valid
+ // transition that we have to handle because edge triggered poll events
+ // latch the ready state even if it is later de-asserted -- don't enqueue
+ // or print an error log in this case.
+ if (ret != -EBUSY)
+ Enqueue(buffer, slot);
+ } else {
+ ALOGE(
+ "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
+ "queue_id=%d buffer_id=%d: %s",
+ id(), buffer->id(), strerror(-ret));
}
- Enqueue(buffer, slot);
} else if (events & EPOLLHUP) {
// This might be caused by producer replacing an existing buffer slot, or
// when BufferHubQueue is shutting down. For the first case, currently the
}
}
-void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
- auto status = GetEventMask(event.events);
+void BufferHubQueue::HandleQueueEvent(int poll_event) {
+ auto status = GetEventMask(poll_event);
if (!status) {
ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
return;
}
- int events = status.get();
+ const int events = status.get();
if (events & EPOLLIN) {
// Note that after buffer imports, if |count()| still returns 0, epoll
// wait will be tried again to acquire the newly imported buffer.
ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
hung_up_ = true;
} else {
- ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
+ ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
}
}
int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
size_t slot) {
+ ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
+ id(), buf->id(), slot);
// For producer buffer, we need to enqueue the newly added buffer
// immediately. Producer queue starts with all buffers in available state.
const int ret = BufferHubQueue::AddBuffer(buf, slot);
int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* release_fence) {
+ ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
+ id(), buf->id());
auto buffer = std::static_pointer_cast<BufferProducer>(buf);
return buffer->Gain(release_fence);
}
int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
size_t slot) {
- // Consumer queue starts with all buffers in unavailable state.
- return BufferHubQueue::AddBuffer(buf, slot);
+ ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
+ id(), buf->id(), slot);
+ const int ret = BufferHubQueue::AddBuffer(buf, slot);
+ if (ret < 0)
+ return ret;
+
+ // Check to see if the buffer is already signaled. This is necessary to catch
+ // cases where buffers are already available; epoll edge triggered mode does
+ // not fire until and edge transition when adding new buffers to the epoll
+ // set.
+ const int kTimeoutMs = 0;
+ pollfd pfd{buf->event_fd(), POLLIN, 0};
+ const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
+ if (count < 0) {
+ const int error = errno;
+ ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
+ strerror(errno));
+ return -error;
+ }
+
+ if (count == 1)
+ HandleBufferEvent(slot, pfd.revents);
+
+ return 0;
}
Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* acquire_fence) {
+ ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
+ id(), buf->id());
auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
}
// Wait for buffers to be released and re-add them to the queue.
bool WaitForBuffers(int timeout);
- void HandleBufferEvent(size_t slot, const epoll_event& event);
- void HandleQueueEvent(const epoll_event& event);
+ void HandleBufferEvent(size_t slot, int poll_events);
+ void HandleQueueEvent(int poll_events);
virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* fence) = 0;
return BASE::Create(sizeof(Meta), usage_set_mask, usage_clear_mask,
usage_deny_set_mask, usage_deny_clear_mask);
}
+ static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes,
+ uint32_t usage_set_mask,
+ uint32_t usage_clear_mask,
+ uint32_t usage_deny_set_mask,
+ uint32_t usage_deny_clear_mask) {
+ return BASE::Create(meta_size_bytes, usage_set_mask, usage_clear_mask,
+ usage_deny_set_mask, usage_deny_clear_mask);
+ }
// Import a |ProducerQueue| from a channel handle.
static std::unique_ptr<ProducerQueue> Import(LocalChannelHandle handle) {
LocalHandle* release_fence) override;
};
+// Explicit specializations of ProducerQueue::Create for void metadata type.
+template <>
+inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>() {
+ return ProducerQueue::Create(0);
+}
+template <>
+inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>(
+ uint32_t usage_set_mask, uint32_t usage_clear_mask,
+ uint32_t usage_deny_set_mask, uint32_t usage_deny_clear_mask) {
+ return ProducerQueue::Create(0, usage_set_mask, usage_clear_mask,
+ usage_deny_set_mask, usage_deny_clear_mask);
+}
+
class ConsumerQueue : public BufferHubQueue {
public:
// Get a buffer consumer. Note that the method doesn't check whether the
class BufferHubQueueTest : public ::testing::Test {
public:
template <typename Meta>
- bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
- int usage_deny_set_mask = 0,
- int usage_deny_clear_mask = 0) {
+ bool CreateProducerQueue(uint64_t usage_set_mask = 0,
+ uint64_t usage_clear_mask = 0,
+ uint64_t usage_deny_set_mask = 0,
+ uint64_t usage_deny_clear_mask = 0) {
producer_queue_ =
ProducerQueue::Create<Meta>(usage_set_mask, usage_clear_mask,
usage_deny_set_mask, usage_deny_clear_mask);
- if (!producer_queue_)
- return false;
+ return producer_queue_ != nullptr;
+ }
- consumer_queue_ = producer_queue_->CreateConsumerQueue();
- if (!consumer_queue_)
+ bool CreateConsumerQueue() {
+ if (producer_queue_) {
+ consumer_queue_ = producer_queue_->CreateConsumerQueue();
+ return consumer_queue_ != nullptr;
+ } else {
return false;
+ }
+ }
- return true;
+ template <typename Meta>
+ bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
+ int usage_deny_set_mask = 0,
+ int usage_deny_clear_mask = 0) {
+ return CreateProducerQueue<Meta>(usage_set_mask, usage_clear_mask,
+ usage_deny_set_mask,
+ usage_deny_clear_mask) &&
+ CreateConsumerQueue();
}
void AllocateBuffer() {
}
}
+TEST_F(BufferHubQueueTest, TestMultipleConsumers) {
+ ASSERT_TRUE(CreateProducerQueue<void>());
+
+ // Allocate buffers.
+ const size_t kBufferCount = 4u;
+ for (size_t i = 0; i < kBufferCount; i++) {
+ AllocateBuffer();
+ }
+ ASSERT_EQ(kBufferCount, producer_queue_->count());
+
+ // Build a silent consumer queue to test multi-consumer queue features.
+ auto silent_queue = producer_queue_->CreateSilentConsumerQueue();
+ ASSERT_NE(nullptr, silent_queue);
+
+ // Check that buffers are correctly imported on construction.
+ EXPECT_EQ(kBufferCount, silent_queue->capacity());
+
+ // Dequeue and post a buffer.
+ size_t slot;
+ LocalHandle fence;
+ auto producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+ ASSERT_TRUE(producer_status.ok());
+ auto producer_buffer = producer_status.take();
+ ASSERT_NE(nullptr, producer_buffer);
+ ASSERT_EQ(0, producer_buffer->Post<void>({}));
+
+ // Currently we expect no buffer to be available prior to calling
+ // WaitForBuffers/HandleQueueEvents.
+ // TODO(eieio): Note this behavior may change in the future.
+ EXPECT_EQ(0u, silent_queue->count());
+ EXPECT_FALSE(silent_queue->HandleQueueEvents());
+ EXPECT_EQ(0u, silent_queue->count());
+
+ // Build a new consumer queue to test multi-consumer queue features.
+ consumer_queue_ = silent_queue->CreateConsumerQueue();
+ ASSERT_NE(nullptr, consumer_queue_);
+
+ // Check that buffers are correctly imported on construction.
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_EQ(1u, consumer_queue_->count());
+
+ // Reclaim released/ignored buffers.
+ producer_queue_->HandleQueueEvents();
+ ASSERT_EQ(kBufferCount - 1, producer_queue_->count());
+
+ // Post another buffer.
+ producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+ ASSERT_TRUE(producer_status.ok());
+ producer_buffer = producer_status.take();
+ ASSERT_NE(nullptr, producer_buffer);
+ ASSERT_EQ(0, producer_buffer->Post<void>({}));
+
+ // Verify that the consumer queue receives it.
+ EXPECT_EQ(1u, consumer_queue_->count());
+ EXPECT_TRUE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(2u, consumer_queue_->count());
+
+ // Dequeue and acquire/release (discard) buffers on the consumer end.
+ auto consumer_status = consumer_queue_->Dequeue(0, &slot, &fence);
+ ASSERT_TRUE(consumer_status.ok());
+ auto consumer_buffer = consumer_status.take();
+ ASSERT_NE(nullptr, consumer_buffer);
+ consumer_buffer->Discard();
+
+ // Buffer should be returned to the producer queue without being handled by
+ // the silent consumer queue.
+ EXPECT_EQ(1u, consumer_queue_->count());
+ EXPECT_EQ(kBufferCount - 2, producer_queue_->count());
+ EXPECT_TRUE(producer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount - 1, producer_queue_->count());
+}
+
struct TestMetadata {
char a;
int32_t b;