OSDN Git Service

Support multiple consumer queues.
authorCorey Tabaka <eieio@google.com>
Thu, 4 May 2017 17:56:05 +0000 (10:56 -0700)
committerCorey Tabaka <eieio@google.com>
Thu, 4 May 2017 23:39:11 +0000 (16:39 -0700)
- Add support for importing posted buffers when spawing a new
  consumer queue.
- Correctly handle adding signaled buffers to epoll with edge
  triggered mode set.
- Add test for multi-consumer behavior.

Bug: 36401174
Test: buffer_hub_queue-test passes.
Change-Id: Id09f01502a1b18bf80a0ae465c2941b548cde2e4

libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp

index ba8fefe..b381d22 100644 (file)
@@ -3,6 +3,7 @@
 #include <inttypes.h>
 #include <log/log.h>
 #include <sys/epoll.h>
+#include <poll.h>
 
 #include <array>
 
 #include <pdx/file_handle.h>
 #include <private/dvr/bufferhub_rpc.h>
 
+#define RETRY_EINTR(fnc_call)                 \
+  ([&]() -> decltype(fnc_call) {              \
+    decltype(fnc_call) result;                \
+    do {                                      \
+      result = (fnc_call);                    \
+    } while (result == -1 && errno == EINTR); \
+    return result;                            \
+  })()
+
 using android::pdx::ErrorStatus;
 using android::pdx::LocalChannelHandle;
 using android::pdx::Status;
@@ -113,8 +123,10 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
 
   // Loop at least once to check for hangups.
   do {
-    ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
-             count(), capacity());
+    ALOGD_IF(
+        TRACE,
+        "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
+        id(), count(), capacity());
 
     // If there is already a buffer then just check for hangup without waiting.
     const int ret = epoll_fd_.Wait(events.data(), events.size(),
@@ -122,7 +134,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
 
     if (ret == 0) {
       ALOGI_IF(TRACE,
-               "BufferHubQueue::WaitForBuffers: No events before timeout.");
+               "BufferHubQueue::WaitForBuffers: No events before timeout: "
+               "queue_id=%d",
+               id());
       return count() != 0;
     }
 
@@ -145,9 +159,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
                index);
 
       if (is_buffer_event_index(index)) {
-        HandleBufferEvent(static_cast<size_t>(index), events[i]);
+        HandleBufferEvent(static_cast<size_t>(index), events[i].events);
       } else if (is_queue_event_index(index)) {
-        HandleQueueEvent(events[i]);
+        HandleQueueEvent(events[i].events);
       } else {
         ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
               index);
@@ -158,29 +172,39 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
   return count() != 0;
 }
 
-void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
+void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
   auto buffer = buffers_[slot];
   if (!buffer) {
     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
     return;
   }
 
-  auto status = buffer->GetEventMask(event.events);
+  auto status = buffer->GetEventMask(poll_events);
   if (!status) {
     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
           status.GetErrorMessage().c_str());
     return;
   }
 
-  int events = status.get();
+  const int events = status.get();
   if (events & EPOLLIN) {
-    int ret = OnBufferReady(buffer, &fences_[slot]);
-    if (ret < 0) {
-      ALOGE("BufferHubQueue::HandleBufferEvent: Failed to set buffer ready: %s",
-            strerror(-ret));
-      return;
+    const int ret = OnBufferReady(buffer, &fences_[slot]);
+    if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
+      // Only enqueue the buffer if it moves to or is already in the state
+      // requested in OnBufferReady(). If the buffer is busy this means that the
+      // buffer moved from released to posted when a new consumer was created
+      // before the ProducerQueue had a chance to regain it. This is a valid
+      // transition that we have to handle because edge triggered poll events
+      // latch the ready state even if it is later de-asserted -- don't enqueue
+      // or print an error log in this case.
+      if (ret != -EBUSY)
+        Enqueue(buffer, slot);
+    } else {
+      ALOGE(
+          "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
+          "queue_id=%d buffer_id=%d: %s",
+          id(), buffer->id(), strerror(-ret));
     }
-    Enqueue(buffer, slot);
   } else if (events & EPOLLHUP) {
     // This might be caused by producer replacing an existing buffer slot, or
     // when BufferHubQueue is shutting down. For the first case, currently the
@@ -203,15 +227,15 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
   }
 }
 
-void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
-  auto status = GetEventMask(event.events);
+void BufferHubQueue::HandleQueueEvent(int poll_event) {
+  auto status = GetEventMask(poll_event);
   if (!status) {
     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
           status.GetErrorMessage().c_str());
     return;
   }
 
-  int events = status.get();
+  const int events = status.get();
   if (events & EPOLLIN) {
     // Note that after buffer imports, if |count()| still returns 0, epoll
     // wait will be tried again to acquire the newly imported buffer.
@@ -224,7 +248,7 @@ void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
     hung_up_ = true;
   } else {
-    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
+    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
   }
 }
 
@@ -407,6 +431,8 @@ int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
 
 int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
                              size_t slot) {
+  ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
+           id(), buf->id(), slot);
   // For producer buffer, we need to enqueue the newly added buffer
   // immediately. Producer queue starts with all buffers in available state.
   const int ret = BufferHubQueue::AddBuffer(buf, slot);
@@ -447,6 +473,8 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
 
 int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                  LocalHandle* release_fence) {
+  ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
+           id(), buf->id());
   auto buffer = std::static_pointer_cast<BufferProducer>(buf);
   return buffer->Gain(release_fence);
 }
@@ -525,8 +553,30 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
 
 int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
                              size_t slot) {
-  // Consumer queue starts with all buffers in unavailable state.
-  return BufferHubQueue::AddBuffer(buf, slot);
+  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
+           id(), buf->id(), slot);
+  const int ret = BufferHubQueue::AddBuffer(buf, slot);
+  if (ret < 0)
+    return ret;
+
+  // Check to see if the buffer is already signaled. This is necessary to catch
+  // cases where buffers are already available; epoll edge triggered mode does
+  // not fire until and edge transition when adding new buffers to the epoll
+  // set.
+  const int kTimeoutMs = 0;
+  pollfd pfd{buf->event_fd(), POLLIN, 0};
+  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
+  if (count < 0) {
+    const int error = errno;
+    ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
+          strerror(errno));
+    return -error;
+  }
+
+  if (count == 1)
+    HandleBufferEvent(slot, pfd.revents);
+
+  return 0;
 }
 
 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
@@ -558,6 +608,8 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
 
 int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                  LocalHandle* acquire_fence) {
+  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
+           id(), buf->id());
   auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
   return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
 }
index b7054da..2a37cfb 100644 (file)
@@ -136,8 +136,8 @@ class BufferHubQueue : public pdx::Client {
 
   // Wait for buffers to be released and re-add them to the queue.
   bool WaitForBuffers(int timeout);
-  void HandleBufferEvent(size_t slot, const epoll_event& event);
-  void HandleQueueEvent(const epoll_event& event);
+  void HandleBufferEvent(size_t slot, int poll_events);
+  void HandleQueueEvent(int poll_events);
 
   virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                             LocalHandle* fence) = 0;
@@ -308,6 +308,14 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
     return BASE::Create(sizeof(Meta), usage_set_mask, usage_clear_mask,
                         usage_deny_set_mask, usage_deny_clear_mask);
   }
+  static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes,
+                                               uint32_t usage_set_mask,
+                                               uint32_t usage_clear_mask,
+                                               uint32_t usage_deny_set_mask,
+                                               uint32_t usage_deny_clear_mask) {
+    return BASE::Create(meta_size_bytes, usage_set_mask, usage_clear_mask,
+                        usage_deny_set_mask, usage_deny_clear_mask);
+  }
 
   // Import a |ProducerQueue| from a channel handle.
   static std::unique_ptr<ProducerQueue> Import(LocalChannelHandle handle) {
@@ -362,6 +370,19 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
                     LocalHandle* release_fence) override;
 };
 
+// Explicit specializations of ProducerQueue::Create for void metadata type.
+template <>
+inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>() {
+  return ProducerQueue::Create(0);
+}
+template <>
+inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>(
+    uint32_t usage_set_mask, uint32_t usage_clear_mask,
+    uint32_t usage_deny_set_mask, uint32_t usage_deny_clear_mask) {
+  return ProducerQueue::Create(0, usage_set_mask, usage_clear_mask,
+                               usage_deny_set_mask, usage_deny_clear_mask);
+}
+
 class ConsumerQueue : public BufferHubQueue {
  public:
   // Get a buffer consumer. Note that the method doesn't check whether the
index 519c879..ba9c179 100644 (file)
@@ -22,20 +22,33 @@ constexpr int kBufferSliceCount = 1;  // number of slices in each buffer
 class BufferHubQueueTest : public ::testing::Test {
  public:
   template <typename Meta>
-  bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
-                    int usage_deny_set_mask = 0,
-                    int usage_deny_clear_mask = 0) {
+  bool CreateProducerQueue(uint64_t usage_set_mask = 0,
+                           uint64_t usage_clear_mask = 0,
+                           uint64_t usage_deny_set_mask = 0,
+                           uint64_t usage_deny_clear_mask = 0) {
     producer_queue_ =
         ProducerQueue::Create<Meta>(usage_set_mask, usage_clear_mask,
                                     usage_deny_set_mask, usage_deny_clear_mask);
-    if (!producer_queue_)
-      return false;
+    return producer_queue_ != nullptr;
+  }
 
-    consumer_queue_ = producer_queue_->CreateConsumerQueue();
-    if (!consumer_queue_)
+  bool CreateConsumerQueue() {
+    if (producer_queue_) {
+      consumer_queue_ = producer_queue_->CreateConsumerQueue();
+      return consumer_queue_ != nullptr;
+    } else {
       return false;
+    }
+  }
 
-    return true;
+  template <typename Meta>
+  bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
+                    int usage_deny_set_mask = 0,
+                    int usage_deny_clear_mask = 0) {
+    return CreateProducerQueue<Meta>(usage_set_mask, usage_clear_mask,
+                                     usage_deny_set_mask,
+                                     usage_deny_clear_mask) &&
+           CreateConsumerQueue();
   }
 
   void AllocateBuffer() {
@@ -134,6 +147,78 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {
   }
 }
 
+TEST_F(BufferHubQueueTest, TestMultipleConsumers) {
+  ASSERT_TRUE(CreateProducerQueue<void>());
+
+  // Allocate buffers.
+  const size_t kBufferCount = 4u;
+  for (size_t i = 0; i < kBufferCount; i++) {
+    AllocateBuffer();
+  }
+  ASSERT_EQ(kBufferCount, producer_queue_->count());
+
+  // Build a silent consumer queue to test multi-consumer queue features.
+  auto silent_queue = producer_queue_->CreateSilentConsumerQueue();
+  ASSERT_NE(nullptr, silent_queue);
+
+  // Check that buffers are correctly imported on construction.
+  EXPECT_EQ(kBufferCount, silent_queue->capacity());
+
+  // Dequeue and post a buffer.
+  size_t slot;
+  LocalHandle fence;
+  auto producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+  ASSERT_TRUE(producer_status.ok());
+  auto producer_buffer = producer_status.take();
+  ASSERT_NE(nullptr, producer_buffer);
+  ASSERT_EQ(0, producer_buffer->Post<void>({}));
+
+  // Currently we expect no buffer to be available prior to calling
+  // WaitForBuffers/HandleQueueEvents.
+  // TODO(eieio): Note this behavior may change in the future.
+  EXPECT_EQ(0u, silent_queue->count());
+  EXPECT_FALSE(silent_queue->HandleQueueEvents());
+  EXPECT_EQ(0u, silent_queue->count());
+
+  // Build a new consumer queue to test multi-consumer queue features.
+  consumer_queue_ = silent_queue->CreateConsumerQueue();
+  ASSERT_NE(nullptr, consumer_queue_);
+
+  // Check that buffers are correctly imported on construction.
+  EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+  EXPECT_EQ(1u, consumer_queue_->count());
+
+  // Reclaim released/ignored buffers.
+  producer_queue_->HandleQueueEvents();
+  ASSERT_EQ(kBufferCount - 1, producer_queue_->count());
+
+  // Post another buffer.
+  producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+  ASSERT_TRUE(producer_status.ok());
+  producer_buffer = producer_status.take();
+  ASSERT_NE(nullptr, producer_buffer);
+  ASSERT_EQ(0, producer_buffer->Post<void>({}));
+
+  // Verify that the consumer queue receives it.
+  EXPECT_EQ(1u, consumer_queue_->count());
+  EXPECT_TRUE(consumer_queue_->HandleQueueEvents());
+  EXPECT_EQ(2u, consumer_queue_->count());
+
+  // Dequeue and acquire/release (discard) buffers on the consumer end.
+  auto consumer_status = consumer_queue_->Dequeue(0, &slot, &fence);
+  ASSERT_TRUE(consumer_status.ok());
+  auto consumer_buffer = consumer_status.take();
+  ASSERT_NE(nullptr, consumer_buffer);
+  consumer_buffer->Discard();
+
+  // Buffer should be returned to the producer queue without being handled by
+  // the silent consumer queue.
+  EXPECT_EQ(1u, consumer_queue_->count());
+  EXPECT_EQ(kBufferCount - 2, producer_queue_->count());
+  EXPECT_TRUE(producer_queue_->HandleQueueEvents());
+  EXPECT_EQ(kBufferCount - 1, producer_queue_->count());
+}
+
 struct TestMetadata {
   char a;
   int32_t b;