OSDN Git Service

Queue: Wait for unregistration if on different thread
authorHansong Zhang <hsz@google.com>
Tue, 3 Mar 2020 20:43:23 +0000 (12:43 -0800)
committerHansong Zhang <hsz@google.com>
Tue, 3 Mar 2020 21:11:07 +0000 (13:11 -0800)
Before Queue object is deleted, we must make sure that the enqueue or
dequeue callback isn't executing. If on same thread, callbacks are
synchronized. If on different thread, we must wait for unregistration to
synchronize callbacks.

Test: bluetooth_test_gd
Bug: 150174451
Change-Id: Id3c980aa0bf7bd9fa10c33c5cca3751df38f7d97

gd/os/linux_generic/queue.tpp
gd/os/linux_generic/queue_unittest.cc

index dd1deff..368a9af 100644 (file)
@@ -37,11 +37,22 @@ void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
 
 template <typename T>
 void Queue<T>::UnregisterEnqueue() {
-  std::lock_guard<std::mutex> lock(mutex_);
-  ASSERT(enqueue_.reactable_ != nullptr);
-  enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
-  enqueue_.reactable_ = nullptr;
-  enqueue_.handler_ = nullptr;
+  Reactor* reactor = nullptr;
+  Reactor::Reactable* to_unregister = nullptr;
+  bool wait_for_unregister = false;
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    ASSERT(enqueue_.reactable_ != nullptr);
+    reactor = enqueue_.handler_->thread_->GetReactor();
+    wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
+    to_unregister = enqueue_.reactable_;
+    enqueue_.reactable_ = nullptr;
+    enqueue_.handler_ = nullptr;
+  }
+  reactor->Unregister(to_unregister);
+  if (wait_for_unregister) {
+    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
+  }
 }
 
 template <typename T>
@@ -56,11 +67,22 @@ void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
 
 template <typename T>
 void Queue<T>::UnregisterDequeue() {
-  std::lock_guard<std::mutex> lock(mutex_);
-  ASSERT(dequeue_.reactable_ != nullptr);
-  dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
-  dequeue_.reactable_ = nullptr;
-  dequeue_.handler_ = nullptr;
+  Reactor* reactor = nullptr;
+  Reactor::Reactable* to_unregister = nullptr;
+  bool wait_for_unregister = false;
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    ASSERT(dequeue_.reactable_ != nullptr);
+    reactor = dequeue_.handler_->thread_->GetReactor();
+    wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
+    to_unregister = dequeue_.reactable_;
+    dequeue_.reactable_ = nullptr;
+    dequeue_.handler_ = nullptr;
+  }
+  reactor->Unregister(to_unregister);
+  if (wait_for_unregister) {
+    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
+  }
 }
 
 template <typename T>
index f0ca2bd..e721221 100644 (file)
@@ -17,6 +17,7 @@
 #include "os/queue.h"
 
 #include <sys/eventfd.h>
+#include <atomic>
 #include <future>
 #include <unordered_map>
 
@@ -721,6 +722,68 @@ TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
   future.wait();
 }
 
+std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  (*to_increase)++;
+  return std::make_unique<std::string>("Hello");
+}
+
+TEST_F(QueueTest, unregister_enqueue_and_wait) {
+  Queue<std::string> queue(10);
+  int* indicator = new int(100);
+  queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  queue.UnregisterEnqueue();
+  EXPECT_EQ(*indicator, 101);
+  delete indicator;
+}
+
+std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(int* to_increase, Queue<std::string>* queue,
+                                                                       std::atomic_bool* is_registered) {
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  (*to_increase)++;
+  if (is_registered->exchange(false)) {
+    queue->UnregisterEnqueue();
+  }
+  return std::make_unique<std::string>("Hello");
+}
+
+TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
+  Queue<std::string> queue(10);
+  int* indicator = new int(100);
+  std::atomic_bool is_registered = true;
+  queue.RegisterEnqueue(enqueue_handler_,
+                        common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator),
+                                     common::Unretained(&queue), common::Unretained(&is_registered)));
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  if (is_registered.exchange(false)) {
+    queue.UnregisterEnqueue();
+  }
+  EXPECT_EQ(*indicator, 101);
+  delete indicator;
+}
+
+void sleep_and_dequeue_callback(int* to_increase) {
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  (*to_increase)++;
+}
+
+TEST_F(QueueTest, unregister_dequeue_and_wait) {
+  int* indicator = new int(100);
+  Queue<std::string> queue(10);
+  queue.RegisterEnqueue(enqueue_handler_, common::Bind(
+                                              [](Queue<std::string>* queue) {
+                                                queue->UnregisterEnqueue();
+                                                return std::make_unique<std::string>("Hello");
+                                              },
+                                              common::Unretained(&queue)));
+  queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  queue.UnregisterDequeue();
+  EXPECT_EQ(*indicator, 101);
+  delete indicator;
+}
+
 // Create all threads for death tests in the function that dies
 class QueueDeathTest : public ::testing::Test {
  public: