OSDN Git Service

OS Queue and Reactor: Minor fixes
authorHansong Zhang <hsz@google.com>
Wed, 4 Mar 2020 21:55:09 +0000 (13:55 -0800)
committerHansong Zhang <hsz@google.com>
Thu, 5 Mar 2020 17:30:50 +0000 (09:30 -0800)
* When deleting EnqueueBuffer, unregister if needed
* Add a mutex to Reactor::WaitForUnregisteredReactable to prevent from
potential race
* EnqueueBuffer: Use an atomic_bool to synchronize Register/Unregister

Bug: 150174451
Test: cert/run --device
Change-Id: I7abcc2f763f8f0575dc0c67141571904c9a20136

gd/os/linux_generic/queue_unittest.cc
gd/os/linux_generic/reactor.cc
gd/os/queue.h

index e721221..61b0151 100644 (file)
@@ -892,6 +892,18 @@ TEST_F(EnqueueBufferTest, clear) {
   ASSERT_FALSE(enqueue_.registered_);
 }
 
+TEST_F(EnqueueBufferTest, delete_when_in_callback) {
+  Queue<int>* queue = new Queue<int>(kQueueSize);
+  EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
+  int num_items = 10;
+  for (int i = 0; i < num_items; i++) {
+    enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
+  }
+
+  delete enqueue_buffer;
+  delete queue;
+}
+
 }  // namespace
 }  // namespace os
 }  // namespace bluetooth
index 0f3f949..8aadf2e 100644 (file)
@@ -193,10 +193,12 @@ void Reactor::Unregister(Reactor::Reactable* reactable) {
 }
 
 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
+  std::lock_guard<std::mutex> lock(mutex_);
   if (executing_reactable_finished_ == nullptr) {
     return true;
   }
   auto stop_status = executing_reactable_finished_->wait_for(timeout);
+  executing_reactable_finished_ = nullptr;
   if (stop_status != std::future_status::ready) {
     LOG_ERROR("Unregister reactable timed out");
   }
index 1d2af02..792dbab 100644 (file)
@@ -106,17 +106,23 @@ class EnqueueBuffer {
  public:
   EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
 
+  ~EnqueueBuffer() {
+    if (enqueue_registered_.exchange(false)) {
+      queue_->UnregisterEnqueue();
+    }
+  }
+
   void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
     std::lock_guard<std::mutex> lock(mutex_);
     buffer_.push(std::move(t));
-    if (buffer_.size() == 1) {
+    if (!enqueue_registered_.exchange(true)) {
       queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
     }
   }
 
   void Clear() {
     std::lock_guard<std::mutex> lock(mutex_);
-    if (!buffer_.empty()) {
+    if (enqueue_registered_.exchange(false)) {
       queue_->UnregisterEnqueue();
       std::queue<std::unique_ptr<T>> empty;
       std::swap(buffer_, empty);
@@ -128,7 +134,7 @@ class EnqueueBuffer {
     std::lock_guard<std::mutex> lock(mutex_);
     std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
     buffer_.pop();
-    if (buffer_.empty()) {
+    if (buffer_.empty() && enqueue_registered_.exchange(false)) {
       queue_->UnregisterEnqueue();
     }
     return enqueued_t;
@@ -136,6 +142,7 @@ class EnqueueBuffer {
 
   mutable std::mutex mutex_;
   IQueueEnqueue<T>* queue_;
+  std::atomic_bool enqueue_registered_ = false;
   std::queue<std::unique_ptr<T>> buffer_;
 };