ASSERT_FALSE(enqueue_.registered_);
}
+TEST_F(EnqueueBufferTest, delete_when_in_callback) {
+ Queue<int>* queue = new Queue<int>(kQueueSize);
+ EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
+ int num_items = 10;
+ for (int i = 0; i < num_items; i++) {
+ enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
+ }
+
+ delete enqueue_buffer;
+ delete queue;
+}
+
} // namespace
} // namespace os
} // namespace bluetooth
}
bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
+ std::lock_guard<std::mutex> lock(mutex_);
if (executing_reactable_finished_ == nullptr) {
return true;
}
auto stop_status = executing_reactable_finished_->wait_for(timeout);
+ executing_reactable_finished_ = nullptr;
if (stop_status != std::future_status::ready) {
LOG_ERROR("Unregister reactable timed out");
}
public:
EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
+ ~EnqueueBuffer() {
+ if (enqueue_registered_.exchange(false)) {
+ queue_->UnregisterEnqueue();
+ }
+ }
+
void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
std::lock_guard<std::mutex> lock(mutex_);
buffer_.push(std::move(t));
- if (buffer_.size() == 1) {
+ if (!enqueue_registered_.exchange(true)) {
queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
}
}
void Clear() {
std::lock_guard<std::mutex> lock(mutex_);
- if (!buffer_.empty()) {
+ if (enqueue_registered_.exchange(false)) {
queue_->UnregisterEnqueue();
std::queue<std::unique_ptr<T>> empty;
std::swap(buffer_, empty);
std::lock_guard<std::mutex> lock(mutex_);
std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
buffer_.pop();
- if (buffer_.empty()) {
+ if (buffer_.empty() && enqueue_registered_.exchange(false)) {
queue_->UnregisterEnqueue();
}
return enqueued_t;
mutable std::mutex mutex_;
IQueueEnqueue<T>* queue_;
+ std::atomic_bool enqueue_registered_ = false;
std::queue<std::unique_ptr<T>> buffer_;
};