From f607f2d1a434698d3c44a500042e07225a7a4dd0 Mon Sep 17 00:00:00 2001 From: Jack He Date: Fri, 24 Aug 2018 17:14:10 -0700 Subject: [PATCH] Common: Replace ExecutionBarrier with std::promise and std::future * std::promise and std::future are able to achieve the same functionalities of ExecutionBarrier with extra flexibility * Replace "_barrier" with "_promise" in system/bt/common Bug: 110303473 Fixes: 112159657 Test: mm -j40, unit test Change-Id: I2a420bbf16bf92e4b3dd256d9f23480fc2be7be1 --- common/Android.bp | 2 - common/benchmark/thread_performance_benchmark.cc | 120 +++++++++++++---------- common/execution_barrier.cc | 38 ------- common/execution_barrier.h | 70 ------------- common/execution_barrier_unittest.cc | 95 ------------------ common/message_loop_thread.cc | 22 ++--- common/message_loop_thread.h | 11 +-- common/message_loop_thread_unittest.cc | 77 +++++++-------- common/test/thread_performance_test.cc | 72 ++++++++------ 9 files changed, 159 insertions(+), 348 deletions(-) delete mode 100644 common/execution_barrier.cc delete mode 100644 common/execution_barrier.h delete mode 100644 common/execution_barrier_unittest.cc diff --git a/common/Android.bp b/common/Android.bp index f8e8e1889..d33c751c8 100644 --- a/common/Android.bp +++ b/common/Android.bp @@ -8,7 +8,6 @@ cc_library_static { ], srcs: [ "message_loop_thread.cc", - "execution_barrier.cc", "metrics.cc", "time_util.cc", ], @@ -27,7 +26,6 @@ cc_test { "system/bt/stack/include", ], srcs : [ - "execution_barrier_unittest.cc", "leaky_bonded_queue_unittest.cc", "message_loop_thread_unittest.cc", "metrics_unittest.cc", diff --git a/common/benchmark/thread_performance_benchmark.cc b/common/benchmark/thread_performance_benchmark.cc index 657432937..74f157f85 100644 --- a/common/benchmark/thread_performance_benchmark.cc +++ b/common/benchmark/thread_performance_benchmark.cc @@ -19,22 +19,21 @@ #include #include #include +#include #include #include -#include "common/execution_barrier.h" #include "common/message_loop_thread.h" #include "osi/include/fixed_queue.h" #include "osi/include/thread.h" using ::benchmark::State; -using bluetooth::common::ExecutionBarrier; using bluetooth::common::MessageLoopThread; #define NUM_MESSAGES_TO_SEND 100000 volatile static int g_counter = 0; -static std::unique_ptr g_counter_barrier = nullptr; +static std::unique_ptr> g_counter_promise = nullptr; void pthread_callback_batch(void* context) { auto queue = static_cast(context); @@ -42,16 +41,16 @@ void pthread_callback_batch(void* context) { fixed_queue_dequeue(queue); g_counter++; if (g_counter >= NUM_MESSAGES_TO_SEND) { - g_counter_barrier->NotifyFinished(); + g_counter_promise->set_value(); } } -void callback_sequential(void* context) { g_counter_barrier->NotifyFinished(); } +void callback_sequential(void* context) { g_counter_promise->set_value(); } void callback_sequential_queue(fixed_queue_t* queue, void* context) { CHECK_NE(queue, nullptr); fixed_queue_dequeue(queue); - g_counter_barrier->NotifyFinished(); + g_counter_promise->set_value(); } void callback_batch(fixed_queue_t* queue, void* data) { @@ -59,7 +58,7 @@ void callback_batch(fixed_queue_t* queue, void* data) { fixed_queue_dequeue(queue); g_counter++; if (g_counter >= NUM_MESSAGES_TO_SEND) { - g_counter_barrier->NotifyFinished(); + g_counter_promise->set_value(); } } @@ -67,19 +66,19 @@ class BM_ThreadPerformance : public ::benchmark::Fixture { protected: void SetUp(State& st) override { benchmark::Fixture::SetUp(st); - set_up_barrier_ = std::make_unique(); + set_up_promise_ = std::make_unique>(); g_counter = 0; bt_msg_queue_ = fixed_queue_new(SIZE_MAX); } void TearDown(State& st) override { fixed_queue_free(bt_msg_queue_, nullptr); bt_msg_queue_ = nullptr; - set_up_barrier_.reset(nullptr); - g_counter_barrier.reset(nullptr); + set_up_promise_.reset(nullptr); + g_counter_promise.reset(nullptr); benchmark::Fixture::TearDown(st); } fixed_queue_t* bt_msg_queue_ = nullptr; - std::unique_ptr set_up_barrier_; + std::unique_ptr> set_up_promise_; }; class BM_MessageLoop : public BM_ThreadPerformance { @@ -97,8 +96,8 @@ class BM_MessageLoop : public BM_ThreadPerformance { message_loop_ = new base::MessageLoop(); run_loop_ = new base::RunLoop(); message_loop_->task_runner()->PostTask( - FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); + FROM_HERE, base::BindOnce(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); run_loop_->Run(); delete message_loop_; message_loop_ = nullptr; @@ -115,9 +114,10 @@ class BM_MessageLoopOsiThread : public BM_MessageLoop { protected: void SetUp(State& st) override { BM_MessageLoop::SetUp(st); + std::future set_up_future = set_up_promise_->get_future(); thread_ = thread_new("BM_MessageLoopOnOsiThread thread"); thread_post(thread_, &BM_MessageLoop::RunThread, this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown(State& st) override { @@ -134,23 +134,25 @@ class BM_MessageLoopOsiThread : public BM_MessageLoop { BENCHMARK_F(BM_MessageLoopOsiThread, batch_enque_dequeue)(State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; BENCHMARK_F(BM_MessageLoopOsiThread, sequential_execution)(State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_sequential, nullptr)); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -159,8 +161,9 @@ class BM_MessageLoopStlThread : public BM_MessageLoop { protected: void SetUp(State& st) override { BM_MessageLoop::SetUp(st); + std::future set_up_future = set_up_promise_->get_future(); thread_ = new std::thread(&BM_MessageLoop::RunThread, this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown(State& st) override { @@ -178,23 +181,25 @@ class BM_MessageLoopStlThread : public BM_MessageLoop { BENCHMARK_F(BM_MessageLoopStlThread, batch_enque_dequeue)(State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; BENCHMARK_F(BM_MessageLoopStlThread, sequential_execution)(State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_sequential, nullptr)); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -203,8 +208,9 @@ class BM_MessageLoopPosixThread : public BM_MessageLoop { protected: void SetUp(State& st) override { BM_MessageLoop::SetUp(st); + std::future set_up_future = set_up_promise_->get_future(); pthread_create(&thread_, nullptr, &BM_MessageLoop::RunPThread, (void*)this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown(State& st) override { @@ -220,23 +226,25 @@ class BM_MessageLoopPosixThread : public BM_MessageLoop { BENCHMARK_F(BM_MessageLoopPosixThread, batch_enque_dequeue)(State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; BENCHMARK_F(BM_MessageLoopPosixThread, sequential_execution)(State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); message_loop_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_sequential, nullptr)); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -261,12 +269,13 @@ BENCHMARK_F(BM_OsiReactorThread, batch_enque_dequeue_using_thread_post) (State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); thread_post(thread_, pthread_callback_batch, bt_msg_queue_); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; @@ -274,9 +283,10 @@ BENCHMARK_F(BM_OsiReactorThread, sequential_execution_using_thread_post) (State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); thread_post(thread_, callback_sequential, nullptr); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -287,11 +297,12 @@ BENCHMARK_F(BM_OsiReactorThread, batch_enque_dequeue_using_reactor) callback_batch, nullptr); for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; @@ -301,9 +312,10 @@ BENCHMARK_F(BM_OsiReactorThread, sequential_execution_using_reactor) callback_sequential_queue, nullptr); for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -312,13 +324,14 @@ class BM_MessageLooopThread : public BM_ThreadPerformance { protected: void SetUp(State& st) override { BM_ThreadPerformance::SetUp(st); + std::future set_up_future = set_up_promise_->get_future(); message_loop_thread_ = new MessageLoopThread("BM_MessageLooopThread thread"); message_loop_thread_->StartUp(); message_loop_thread_->DoInThread( - FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); - set_up_barrier_->WaitForExecution(); + FROM_HERE, base::BindOnce(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); + set_up_future.wait(); } void TearDown(State& st) override { @@ -334,23 +347,25 @@ class BM_MessageLooopThread : public BM_ThreadPerformance { BENCHMARK_F(BM_MessageLooopThread, batch_enque_dequeue)(State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); message_loop_thread_->DoInThread( FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; BENCHMARK_F(BM_MessageLooopThread, sequential_execution)(State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); message_loop_thread_->DoInThread( FROM_HERE, base::BindOnce(&callback_sequential, nullptr)); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -359,12 +374,13 @@ class BM_LibChromeThread : public BM_ThreadPerformance { protected: void SetUp(State& st) override { BM_ThreadPerformance::SetUp(st); + std::future set_up_future = set_up_promise_->get_future(); thread_ = new base::Thread("BM_LibChromeThread thread"); thread_->Start(); thread_->task_runner()->PostTask( - FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); - set_up_barrier_->WaitForExecution(); + FROM_HERE, base::BindOnce(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); + set_up_future.wait(); } void TearDown(State& st) override { @@ -380,23 +396,25 @@ class BM_LibChromeThread : public BM_ThreadPerformance { BENCHMARK_F(BM_LibChromeThread, batch_enque_dequeue)(State& state) { for (auto _ : state) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); thread_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } }; BENCHMARK_F(BM_LibChromeThread, sequential_execution)(State& state) { for (auto _ : state) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); thread_->task_runner()->PostTask( FROM_HERE, base::BindOnce(&callback_sequential, nullptr)); - g_counter_barrier->WaitForExecution(); + counter_future.wait(); } } }; @@ -411,4 +429,4 @@ int main(int argc, char** argv) { return 1; } ::benchmark::RunSpecifiedBenchmarks(); -} \ No newline at end of file +} diff --git a/common/execution_barrier.cc b/common/execution_barrier.cc deleted file mode 100644 index 8017b058e..000000000 --- a/common/execution_barrier.cc +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "execution_barrier.h" - -namespace bluetooth { - -namespace common { - -void ExecutionBarrier::WaitForExecution() { - std::unique_lock lock(execution_mutex_); - while (!finished_) { - execution_cv_.wait(lock); - } -} - -void ExecutionBarrier::NotifyFinished() { - std::unique_lock lock(execution_mutex_); - finished_ = true; - execution_cv_.notify_all(); -} - -} // namespace common - -} // namespace bluetooth \ No newline at end of file diff --git a/common/execution_barrier.h b/common/execution_barrier.h deleted file mode 100644 index 973fc44ad..000000000 --- a/common/execution_barrier.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2018 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include - -namespace bluetooth { - -namespace common { - -/** - * A utility to wait for an event on another thread - * - * This class can be used once only. This means that after the first time - * NotifyFinished() is called, WaitForExecution() will no longer block. User - * needs to create a new instance if another ExecutionBarrier is needed. - * - * No reset mechanism is provided for this class to avoid racy scenarios and - * unsafe API usage - * - * Similar to std::experimental::barrier, but this can be used only once - */ -class ExecutionBarrier final { - public: - explicit ExecutionBarrier() : finished_(false){}; - - /** - * Blocks until NotifyFinished() is called on this object - * - */ - void WaitForExecution(); - - /** - * Unblocks any caller who are blocked on WaitForExecution() method call - */ - void NotifyFinished(); - - private: - bool finished_; - std::mutex execution_mutex_; - std::condition_variable execution_cv_; - - /** - * Prevent COPY and ASSIGN since many internal states cannot be copied or - * assigned - */ - DISALLOW_COPY_AND_ASSIGN(ExecutionBarrier); -}; - -} // namespace common - -} // namespace bluetooth \ No newline at end of file diff --git a/common/execution_barrier_unittest.cc b/common/execution_barrier_unittest.cc deleted file mode 100644 index ccb9bd0ed..000000000 --- a/common/execution_barrier_unittest.cc +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2018 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include - -#include "execution_barrier.h" - -using bluetooth::common::ExecutionBarrier; - -static constexpr int kSleepTimeMs = 100; -static constexpr int kSchedulingDelayMaxMs = 5; - -TEST(ExecutionBarrierTest, test_two_threads_wait_before_execution) { - ExecutionBarrier execution_barrier; - std::thread caller1([&]() { - auto start = std::chrono::high_resolution_clock::now(); - execution_barrier.WaitForExecution(); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_ms = end - start; - EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, kSchedulingDelayMaxMs); - }); - std::thread executor([&]() { - // Wait for kSleepTimeMs so that caller1 starts waiting first - std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs)); - execution_barrier.NotifyFinished(); - }); - executor.join(); - caller1.join(); - // Further calls to WaitForExecution() no longer blocks - std::thread caller2([&]() { - auto start = std::chrono::high_resolution_clock::now(); - execution_barrier.WaitForExecution(); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_ms = end - start; - EXPECT_LT(elapsed_ms.count(), kSchedulingDelayMaxMs); - }); - caller2.join(); -} - -TEST(ExecutionBarrierTest, test_two_threads_execution_before_wait) { - ExecutionBarrier execution_barrier; - std::thread executor([&]() { execution_barrier.NotifyFinished(); }); - std::thread caller1([&]() { - // Wait for kSleepTimeMs so that executor finishes running first - std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs)); - auto start = std::chrono::high_resolution_clock::now(); - execution_barrier.WaitForExecution(); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_ms = end - start; - EXPECT_LT(elapsed_ms.count(), kSchedulingDelayMaxMs); - }); - executor.join(); - caller1.join(); -} - -TEST(ExecutionBarrierTest, test_two_callers_one_executor) { - ExecutionBarrier execution_barrier; - std::thread caller1([&]() { - auto start = std::chrono::high_resolution_clock::now(); - execution_barrier.WaitForExecution(); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_ms = end - start; - EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, 5); - }); - std::thread caller2([&]() { - auto start = std::chrono::high_resolution_clock::now(); - execution_barrier.WaitForExecution(); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_ms = end - start; - EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, 5); - }); - std::thread executor([&]() { - std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs)); - execution_barrier.NotifyFinished(); - }); - executor.join(); - caller1.join(); - caller2.join(); -} diff --git a/common/message_loop_thread.cc b/common/message_loop_thread.cc index f320136fd..9bf25bef1 100644 --- a/common/message_loop_thread.cc +++ b/common/message_loop_thread.cc @@ -49,11 +49,11 @@ void MessageLoopThread::StartUp() { LOG(WARNING) << __func__ << ": thread " << *this << " is already started"; return; } - std::shared_ptr start_up_barrier = - std::make_shared(); - thread_ = - new std::thread(&MessageLoopThread::RunThread, this, start_up_barrier); - start_up_barrier->WaitForExecution(); + std::promise start_up_promise; + std::future start_up_future = start_up_promise.get_future(); + thread_ = new std::thread(&MessageLoopThread::RunThread, this, + std::move(start_up_promise)); + start_up_future.wait(); } bool MessageLoopThread::DoInThread(const tracked_objects::Location& from_here, @@ -116,10 +116,9 @@ bool MessageLoopThread::IsRunning() const { } // Non API method, should not be protected by API mutex -void MessageLoopThread::RunThread( - MessageLoopThread* thread, - std::shared_ptr start_up_barrier) { - thread->Run(std::move(start_up_barrier)); +void MessageLoopThread::RunThread(MessageLoopThread* thread, + std::promise start_up_promise) { + thread->Run(std::move(start_up_promise)); } base::MessageLoop* MessageLoopThread::message_loop() const { @@ -147,8 +146,7 @@ bool MessageLoopThread::EnableRealTimeScheduling() { } // Non API method, should NOT be protected by API mutex to avoid deadlock -void MessageLoopThread::Run( - std::shared_ptr start_up_barrier) { +void MessageLoopThread::Run(std::promise start_up_promise) { LOG(INFO) << __func__ << ": message loop starting for thread " << thread_name_; base::PlatformThread::SetName(thread_name_); @@ -156,7 +154,7 @@ void MessageLoopThread::Run( run_loop_ = new base::RunLoop(); thread_id_ = base::PlatformThread::CurrentId(); linux_tid_ = static_cast(syscall(SYS_gettid)); - start_up_barrier->NotifyFinished(); + start_up_promise.set_value(); // Blocking until ShutDown() is called run_loop_->Run(); thread_id_ = -1; diff --git a/common/message_loop_thread.h b/common/message_loop_thread.h index ff4cc89bb..c342be401 100644 --- a/common/message_loop_thread.h +++ b/common/message_loop_thread.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -27,8 +28,6 @@ #include #include -#include "common/execution_barrier.h" - namespace bluetooth { namespace common { @@ -138,19 +137,19 @@ class MessageLoopThread final { * This is used instead of a C++ lambda because of the use of std::shared_ptr * * @param context needs to be a pointer to an instance of MessageLoopThread - * @param start_up_barrier an ExecutionBarrier that is used to notify calling + * @param start_up_promise a std::promise that is used to notify calling * thread the completion of message loop start-up */ static void RunThread(MessageLoopThread* context, - std::shared_ptr start_up_barrier); + std::promise start_up_promise); /** * Actual method to run the thread, blocking until ShutDown() is called * - * @param start_up_barrier an ExecutionBarrier that is used to notify calling + * @param start_up_promise a std::promise that is used to notify calling * thread the completion of message loop start-up */ - void Run(std::shared_ptr start_up_barrier); + void Run(std::promise start_up_promise); mutable std::recursive_mutex api_mutex_; std::string thread_name_; diff --git a/common/message_loop_thread_unittest.cc b/common/message_loop_thread_unittest.cc index 28f5d9e48..0924ebe14 100644 --- a/common/message_loop_thread_unittest.cc +++ b/common/message_loop_thread_unittest.cc @@ -26,10 +26,8 @@ #include #include -#include "execution_barrier.h" #include "message_loop_thread.h" -using bluetooth::common::ExecutionBarrier; using bluetooth::common::MessageLoopThread; /** @@ -39,34 +37,28 @@ class MessageLoopThreadTest : public ::testing::Test { public: void ShouldNotHappen() { FAIL() << "Should not happen"; } - void GetThreadId(base::PlatformThreadId* thread_id, - std::shared_ptr execution_barrier) { - *thread_id = base::PlatformThread::CurrentId(); - execution_barrier->NotifyFinished(); + void GetThreadId(std::promise thread_id_promise) { + thread_id_promise.set_value(base::PlatformThread::CurrentId()); } - void GetLinuxTid(pid_t* tid, - std::shared_ptr execution_barrier) { - *tid = static_cast(syscall(SYS_gettid)); - execution_barrier->NotifyFinished(); + void GetLinuxTid(std::promise tid_promise) { + tid_promise.set_value(static_cast(syscall(SYS_gettid))); } - void GetName(std::string* name, - std::shared_ptr execution_barrier) { + void GetName(std::promise name_promise) { char my_name[256]; pthread_getname_np(pthread_self(), my_name, sizeof(my_name)); - name->append(my_name); - execution_barrier->NotifyFinished(); + name_promise.set_value(my_name); } - void GetSchedulingPolicyAndPriority( - int* scheduling_policy, int* schedule_priority, - std::shared_ptr execution_barrier) { + void GetSchedulingPolicyAndPriority(int* scheduling_policy, + int* schedule_priority, + std::promise execution_promise) { *scheduling_policy = sched_getscheduler(0); struct sched_param param = {}; ASSERT_EQ(sched_getparam(0, ¶m), 0); *schedule_priority = param.sched_priority; - execution_barrier->NotifyFinished(); + execution_promise.set_value(); } protected: @@ -133,15 +125,14 @@ TEST_F(MessageLoopThreadTest, test_name) { MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); ASSERT_GE(message_loop_thread.GetThreadId(), 0); - std::shared_ptr execution_barrier = - std::make_shared(); - std::string myName; + std::promise name_promise; + std::future name_future = name_promise.get_future(); message_loop_thread.DoInThread( FROM_HERE, - base::Bind(&MessageLoopThreadTest::GetName, base::Unretained(this), - &myName, execution_barrier)); - execution_barrier->WaitForExecution(); - ASSERT_EQ(name, myName); + base::BindOnce(&MessageLoopThreadTest::GetName, base::Unretained(this), + std::move(name_promise))); + std::string my_name = name_future.get(); + ASSERT_EQ(name, my_name); ASSERT_EQ(name, message_loop_thread.GetName()); } @@ -151,14 +142,14 @@ TEST_F(MessageLoopThreadTest, test_thread_id) { message_loop_thread.StartUp(); base::PlatformThreadId thread_id = message_loop_thread.GetThreadId(); ASSERT_GE(thread_id, 0); - std::shared_ptr execution_barrier = - std::make_shared(); - base::PlatformThreadId my_thread_id; + std::promise thread_id_promise; + std::future thread_id_future = + thread_id_promise.get_future(); message_loop_thread.DoInThread( FROM_HERE, - base::Bind(&MessageLoopThreadTest::GetThreadId, base::Unretained(this), - &my_thread_id, execution_barrier)); - execution_barrier->WaitForExecution(); + base::BindOnce(&MessageLoopThreadTest::GetThreadId, + base::Unretained(this), std::move(thread_id_promise))); + base::PlatformThreadId my_thread_id = thread_id_future.get(); ASSERT_EQ(thread_id, my_thread_id); } @@ -187,26 +178,26 @@ TEST_F(MessageLoopThreadTest, test_set_realtime_priority_success) { return; } } - std::shared_ptr execution_barrier = - std::make_shared(); + std::promise execution_promise; + std::future execution_future = execution_promise.get_future(); int scheduling_policy = -1; int scheduling_priority = -1; message_loop_thread.DoInThread( FROM_HERE, - base::Bind(&MessageLoopThreadTest::GetSchedulingPolicyAndPriority, - base::Unretained(this), &scheduling_policy, - &scheduling_priority, execution_barrier)); - execution_barrier->WaitForExecution(); + base::BindOnce(&MessageLoopThreadTest::GetSchedulingPolicyAndPriority, + base::Unretained(this), &scheduling_policy, + &scheduling_priority, std::move(execution_promise))); + execution_future.wait(); ASSERT_EQ(scheduling_policy, SCHED_FIFO); // Internal implementation verified here ASSERT_EQ(scheduling_priority, 1); - execution_barrier = std::make_shared(); - pid_t linux_tid = -1; + std::promise tid_promise; + std::future tid_future = tid_promise.get_future(); message_loop_thread.DoInThread( FROM_HERE, - base::Bind(&MessageLoopThreadTest::GetLinuxTid, base::Unretained(this), - &linux_tid, execution_barrier)); - execution_barrier->WaitForExecution(); + base::BindOnce(&MessageLoopThreadTest::GetLinuxTid, + base::Unretained(this), std::move(tid_promise))); + pid_t linux_tid = tid_future.get(); ASSERT_GT(linux_tid, 0); ASSERT_EQ(sched_getscheduler(linux_tid), SCHED_FIFO); struct sched_param param = {}; @@ -256,4 +247,4 @@ TEST_F(MessageLoopThreadTest, test_to_string_method) { // String representation should look the same when thread is not running ASSERT_STREQ(thread_string_after_shutdown.c_str(), thread_string_before_start.c_str()); -} \ No newline at end of file +} diff --git a/common/test/thread_performance_test.cc b/common/test/thread_performance_test.cc index a9b0525de..dc6a53f2e 100644 --- a/common/test/thread_performance_test.cc +++ b/common/test/thread_performance_test.cc @@ -22,21 +22,20 @@ #include #include #include +#include #include #include -#include "common/execution_barrier.h" #include "common/message_loop_thread.h" #include "osi/include/fixed_queue.h" #include "osi/include/thread.h" -using bluetooth::common::ExecutionBarrier; using bluetooth::common::MessageLoopThread; #define NUM_MESSAGES_TO_SEND 100000 volatile static int g_counter = 0; -static std::unique_ptr g_counter_barrier = nullptr; +static std::unique_ptr> g_counter_promise = nullptr; void callback_batch(fixed_queue_t* queue, void* data) { if (queue != nullptr) { @@ -44,25 +43,25 @@ void callback_batch(fixed_queue_t* queue, void* data) { } g_counter++; if (g_counter >= NUM_MESSAGES_TO_SEND) { - g_counter_barrier->NotifyFinished(); + g_counter_promise->set_value(); } } class PerformanceTest : public testing::Test { protected: void SetUp() override { - set_up_barrier_ = std::make_unique(); + set_up_promise_ = std::make_unique>(); g_counter = 0; bt_msg_queue_ = fixed_queue_new(SIZE_MAX); } void TearDown() override { fixed_queue_free(bt_msg_queue_, nullptr); bt_msg_queue_ = nullptr; - set_up_barrier_.reset(nullptr); - g_counter_barrier.reset(nullptr); + set_up_promise_.reset(nullptr); + g_counter_promise.reset(nullptr); } fixed_queue_t* bt_msg_queue_ = nullptr; - std::unique_ptr set_up_barrier_ = nullptr; + std::unique_ptr> set_up_promise_ = nullptr; }; class MessageLoopPerformanceTest : public PerformanceTest { @@ -80,8 +79,8 @@ class MessageLoopPerformanceTest : public PerformanceTest { message_loop_ = new base::MessageLoop(); run_loop_ = new base::RunLoop(); message_loop_->task_runner()->PostTask( - FROM_HERE, base::Bind(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); + FROM_HERE, base::Bind(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); run_loop_->Run(); delete message_loop_; message_loop_ = nullptr; @@ -98,9 +97,10 @@ class OsiThreadMessageLoopPerformanceTest : public MessageLoopPerformanceTest { protected: void SetUp() override { MessageLoopPerformanceTest::SetUp(); + std::future set_up_future = set_up_promise_->get_future(); thread_ = thread_new("OsiThreadMessageLoopPerformanceTest thread"); thread_post(thread_, &MessageLoopPerformanceTest::RunThread, this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown() override { @@ -116,7 +116,8 @@ class OsiThreadMessageLoopPerformanceTest : public MessageLoopPerformanceTest { TEST_F(OsiThreadMessageLoopPerformanceTest, message_loop_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); @@ -125,7 +126,7 @@ TEST_F(OsiThreadMessageLoopPerformanceTest, message_loop_speed_test) { message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -141,8 +142,9 @@ class StlThreadMessageLoopPerformanceTest : public MessageLoopPerformanceTest { protected: void SetUp() override { MessageLoopPerformanceTest::SetUp(); + std::future set_up_future = set_up_promise_->get_future(); thread_ = new std::thread(&MessageLoopPerformanceTest::RunThread, this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown() override { @@ -159,7 +161,8 @@ class StlThreadMessageLoopPerformanceTest : public MessageLoopPerformanceTest { TEST_F(StlThreadMessageLoopPerformanceTest, stl_thread_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); @@ -168,7 +171,7 @@ TEST_F(StlThreadMessageLoopPerformanceTest, stl_thread_speed_test) { message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -185,9 +188,10 @@ class PosixThreadMessageLoopPerformanceTest protected: void SetUp() override { MessageLoopPerformanceTest::SetUp(); + std::future set_up_future = set_up_promise_->get_future(); pthread_create(&thread_, nullptr, &MessageLoopPerformanceTest::RunPThread, (void*)this); - set_up_barrier_->WaitForExecution(); + set_up_future.wait(); } void TearDown() override { @@ -202,7 +206,8 @@ class PosixThreadMessageLoopPerformanceTest TEST_F(PosixThreadMessageLoopPerformanceTest, stl_thread_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); @@ -212,7 +217,7 @@ TEST_F(PosixThreadMessageLoopPerformanceTest, stl_thread_speed_test) { message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -242,7 +247,8 @@ class ReactorPerformanceTest : public PerformanceTest { TEST_F(ReactorPerformanceTest, reactor_thread_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); fixed_queue_register_dequeue(bt_msg_queue_, thread_get_reactor(thread_), callback_batch, nullptr); @@ -252,7 +258,7 @@ TEST_F(ReactorPerformanceTest, reactor_thread_speed_test) { for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -268,13 +274,14 @@ class WorkerThreadPerformanceTest : public PerformanceTest { protected: void SetUp() override { PerformanceTest::SetUp(); + std::future set_up_future = set_up_promise_->get_future(); worker_thread_ = new MessageLoopThread("WorkerThreadPerformanceTest thread"); worker_thread_->StartUp(); worker_thread_->DoInThread( - FROM_HERE, base::Bind(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); - set_up_barrier_->WaitForExecution(); + FROM_HERE, base::Bind(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); + set_up_future.wait(); } void TearDown() override { @@ -289,7 +296,8 @@ class WorkerThreadPerformanceTest : public PerformanceTest { TEST_F(WorkerThreadPerformanceTest, worker_thread_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); @@ -299,7 +307,7 @@ TEST_F(WorkerThreadPerformanceTest, worker_thread_speed_test) { worker_thread_->DoInThread( FROM_HERE, base::Bind(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -315,12 +323,13 @@ class LibChromeThreadPerformanceTest : public PerformanceTest { protected: void SetUp() override { PerformanceTest::SetUp(); + std::future set_up_future = set_up_promise_->get_future(); thread_ = new base::Thread("LibChromeThreadPerformanceTest thread"); thread_->Start(); thread_->task_runner()->PostTask( - FROM_HERE, base::Bind(&ExecutionBarrier::NotifyFinished, - base::Unretained(set_up_barrier_.get()))); - set_up_barrier_->WaitForExecution(); + FROM_HERE, base::Bind(&std::promise::set_value, + base::Unretained(set_up_promise_.get()))); + set_up_future.wait(); } void TearDown() override { @@ -335,7 +344,8 @@ class LibChromeThreadPerformanceTest : public PerformanceTest { TEST_F(LibChromeThreadPerformanceTest, worker_thread_speed_test) { g_counter = 0; - g_counter_barrier = std::make_unique(); + g_counter_promise = std::make_unique>(); + std::future counter_future = g_counter_promise->get_future(); std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); @@ -345,7 +355,7 @@ TEST_F(LibChromeThreadPerformanceTest, worker_thread_speed_test) { thread_->task_runner()->PostTask( FROM_HERE, base::Bind(&callback_batch, bt_msg_queue_, nullptr)); } - g_counter_barrier->WaitForExecution(); + counter_future.wait(); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); -- 2.11.0