OSDN Git Service

Add Timer as an alternative to osi alarm
authorHansong Zhang <hsz@google.com>
Tue, 14 Aug 2018 21:29:23 +0000 (14:29 -0700)
committerHansong Zhang <hsz@google.com>
Thu, 13 Sep 2018 20:37:42 +0000 (13:37 -0700)
* Add a private method MessageLoopThread.DoInThreadDelayed to post a
  delayed task in message loop, as an alternative approach to osi alarm
  clock
* Add a unit test for MessageLoopThread to check ShutDown() waits until
  current task finishes
* Add Timer using MessageLoopThread.DoInThreadDelayed
* Timer provides similar API as osi alarm, and uses same OS clock (boot
  timer) as alarm
* Add benchmark and unit tests to ensure the performance is comparable
  to the existing osi alarm

Test: Run unit test and benchmark test
      ./test/run_unit_tests.sh bluetooth_test_common
      ./test/run_benchmarks.sh bluetooth_benchmark_timer_performance
      --benchmark_repetitions=10 --benchmark_report_aggregates_only=true
Bug: 110303473
Change-Id: I6f2e7ae2f80f9889fc5fe3c8cd6b9b2670938b46

common/Android.bp
common/benchmark/timer_performance_benchmark.cc [new file with mode: 0644]
common/message_loop_thread.cc
common/message_loop_thread.h
common/message_loop_thread_unittest.cc
common/timer.cc [new file with mode: 0644]
common/timer.h [new file with mode: 0644]
common/timer_unittest.cc [new file with mode: 0644]
test/run_benchmarks.sh

index d33c751..f0388a9 100644 (file)
@@ -7,6 +7,7 @@ cc_library_static {
         "system/bt/stack/include",
     ],
     srcs: [
+        "timer.cc",
         "message_loop_thread.cc",
         "metrics.cc",
         "time_util.cc",
@@ -26,6 +27,7 @@ cc_test {
         "system/bt/stack/include",
     ],
     srcs : [
+        "timer_unittest.cc",
         "leaky_bonded_queue_unittest.cc",
         "message_loop_thread_unittest.cc",
         "metrics_unittest.cc",
@@ -77,3 +79,22 @@ cc_benchmark {
         "libbt-common"
     ],
 }
+
+cc_benchmark {
+    name: "bluetooth_benchmark_timer_performance",
+    defaults: ["fluoride_defaults"],
+    include_dirs: ["system/bt"],
+    srcs: [
+        "benchmark/timer_performance_benchmark.cc",
+    ],
+    shared_libs: [
+        "liblog",
+        "libprotobuf-cpp-lite",
+        "libcutils",
+    ],
+    static_libs: [
+        "libosi",
+        "libbt-common",
+        "libbt-protos-lite",
+    ],
+}
diff --git a/common/benchmark/timer_performance_benchmark.cc b/common/benchmark/timer_performance_benchmark.cc
new file mode 100644 (file)
index 0000000..a2f2c48
--- /dev/null
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <base/bind.h>
+#include <base/run_loop.h>
+#include <base/threading/thread.h>
+#include <benchmark/benchmark.h>
+#include <future>
+
+#include "common/message_loop_thread.h"
+#include "common/time_util.h"
+#include "common/timer.h"
+#include "osi/include/alarm.h"
+
+using ::benchmark::State;
+using bluetooth::common::MessageLoopThread;
+using bluetooth::common::time_get_os_boottime_us;
+using bluetooth::common::Timer;
+
+// fake get_main_message_loop implementation for alarm
+base::MessageLoop* get_main_message_loop() { return nullptr; }
+
+namespace {
+std::unordered_map<int, int> g_map;
+std::shared_ptr<std::promise<void>> g_promise;
+uint64_t g_start_time;
+int g_scheduled_tasks;
+int g_task_length;
+int g_task_interval;
+int g_task_counter;
+
+void TimerFire(void*) { g_promise->set_value(); }
+
+void AlarmSleepAndCountDelayedTime(void*) {
+  auto end_time_us = time_get_os_boottime_us();
+  auto time_after_start_ms = (end_time_us - g_start_time) / 1000;
+  g_task_counter++;
+  g_map[time_after_start_ms - g_task_counter * g_task_interval]++;
+  std::this_thread::sleep_for(std::chrono::milliseconds(g_task_length));
+  if (g_task_counter >= g_scheduled_tasks) {
+    g_promise->set_value();
+  }
+}
+
+}  // namespace
+
+class BM_OsiAlarmTimer : public ::benchmark::Fixture {
+ protected:
+  void SetUp(State& st) override {
+    ::benchmark::Fixture::SetUp(st);
+    alarm_ = alarm_new("osi_alarm_timer_test");
+    g_promise = std::make_shared<std::promise<void>>();
+  }
+
+  void TearDown(State& st) override {
+    g_promise = nullptr;
+    alarm_free(alarm_);
+    ::benchmark::Fixture::TearDown(st);
+  }
+
+  alarm_t* alarm_ = nullptr;
+};
+
+BENCHMARK_DEFINE_F(BM_OsiAlarmTimer, timer_performance_ms)(State& state) {
+  auto milliseconds = static_cast<int>(state.range(0));
+  for (auto _ : state) {
+    auto start_time_point = time_get_os_boottime_us();
+    alarm_set(alarm_, milliseconds, &TimerFire, nullptr);
+    g_promise->get_future().get();
+    auto end_time_point = time_get_os_boottime_us();
+    auto duration = end_time_point - start_time_point;
+    state.SetIterationTime(duration * 1e-6);
+  }
+};
+
+BENCHMARK_REGISTER_F(BM_OsiAlarmTimer, timer_performance_ms)
+    ->Arg(1)
+    ->Arg(5)
+    ->Arg(10)
+    ->Arg(20)
+    ->Arg(100)
+    ->Arg(1000)
+    ->Arg(2000)
+    ->Iterations(1)
+    ->UseManualTime();
+
+class BM_AlarmTaskTimer : public ::benchmark::Fixture {
+ protected:
+  void SetUp(State& st) override {
+    ::benchmark::Fixture::SetUp(st);
+    message_loop_thread_ = new MessageLoopThread("timer_benchmark");
+    message_loop_thread_->StartUp();
+    message_loop_thread_->EnableRealTimeScheduling();
+    timer_ = new Timer();
+    g_promise = std::make_shared<std::promise<void>>();
+  }
+
+  void TearDown(State& st) override {
+    g_promise = nullptr;
+    delete timer_;
+    timer_ = nullptr;
+    message_loop_thread_->ShutDown();
+    delete message_loop_thread_;
+    message_loop_thread_ = nullptr;
+    ::benchmark::Fixture::TearDown(st);
+  }
+
+  MessageLoopThread* message_loop_thread_;
+  Timer* timer_;
+};
+
+BENCHMARK_DEFINE_F(BM_AlarmTaskTimer, timer_performance_ms)(State& state) {
+  auto milliseconds = static_cast<int>(state.range(0));
+  for (auto _ : state) {
+    auto start_time_point = time_get_os_boottime_us();
+    timer_->Schedule(message_loop_thread_->GetWeakPtr(), FROM_HERE,
+                     base::Bind(&TimerFire, nullptr),
+                     base::TimeDelta::FromMilliseconds(milliseconds));
+    g_promise->get_future().get();
+    timer_->Cancel();
+    auto end_time_point = time_get_os_boottime_us();
+    auto duration = end_time_point - start_time_point;
+    state.SetIterationTime(duration * 1e-6);
+  }
+};
+
+BENCHMARK_REGISTER_F(BM_AlarmTaskTimer, timer_performance_ms)
+    ->Arg(1)
+    ->Arg(5)
+    ->Arg(10)
+    ->Arg(20)
+    ->Arg(100)
+    ->Arg(1000)
+    ->Arg(2000)
+    ->Iterations(1)
+    ->UseManualTime();
+
+class BM_OsiPeriodicAlarmTimer : public ::benchmark::Fixture {
+ protected:
+  void SetUp(State& st) override {
+    ::benchmark::Fixture::SetUp(st);
+    alarm_ = alarm_new_periodic("osi_alarm_timer_test");
+    g_map.clear();
+    g_promise = std::make_shared<std::promise<void>>();
+    g_scheduled_tasks = 0;
+    g_task_length = 0;
+    g_task_interval = 0;
+    g_task_counter = 0;
+  }
+
+  void TearDown(State& st) override {
+    g_promise = nullptr;
+    alarm_free(alarm_);
+    ::benchmark::Fixture::TearDown(st);
+  }
+
+  alarm_t* alarm_ = nullptr;
+};
+
+BENCHMARK_DEFINE_F(BM_OsiPeriodicAlarmTimer, periodic_accuracy)(State& state) {
+  for (auto _ : state) {
+    g_scheduled_tasks = state.range(0);
+    g_task_length = state.range(1);
+    g_task_interval = state.range(2);
+    g_start_time = time_get_os_boottime_us();
+    alarm_set(alarm_, g_task_interval, &AlarmSleepAndCountDelayedTime, nullptr);
+    g_promise->get_future().get();
+    alarm_cancel(alarm_);
+  }
+  for (const auto& delay : g_map) {
+    state.counters[std::to_string(delay.first)] = delay.second;
+  }
+};
+
+BENCHMARK_REGISTER_F(BM_OsiPeriodicAlarmTimer, periodic_accuracy)
+    ->Args({2000, 1, 5})
+    ->Args({2000, 3, 5})
+    ->Args({2000, 1, 7})
+    ->Args({2000, 3, 7})
+    ->Args({2000, 1, 20})
+    ->Args({2000, 5, 20})
+    ->Args({2000, 10, 20})
+    ->Args({2000, 15, 20})
+    ->Iterations(1)
+    ->UseRealTime();
+
+class BM_AlarmTaskPeriodicTimer : public ::benchmark::Fixture {
+ protected:
+  void SetUp(State& st) override {
+    ::benchmark::Fixture::SetUp(st);
+    message_loop_thread_ = new MessageLoopThread("timer_benchmark");
+    message_loop_thread_->StartUp();
+    message_loop_thread_->EnableRealTimeScheduling();
+    timer_ = new Timer();
+    g_map.clear();
+    g_promise = std::make_shared<std::promise<void>>();
+    g_scheduled_tasks = 0;
+    g_task_length = 0;
+    g_task_interval = 0;
+    g_task_counter = 0;
+  }
+
+  void TearDown(State& st) override {
+    g_promise = nullptr;
+    delete timer_;
+    timer_ = nullptr;
+    message_loop_thread_->ShutDown();
+    delete message_loop_thread_;
+    message_loop_thread_ = nullptr;
+    ::benchmark::Fixture::TearDown(st);
+  }
+
+  MessageLoopThread* message_loop_thread_;
+  Timer* timer_;
+};
+
+BENCHMARK_DEFINE_F(BM_AlarmTaskPeriodicTimer, periodic_accuracy)
+(State& state) {
+  for (auto _ : state) {
+    g_scheduled_tasks = state.range(0);
+    g_task_length = state.range(1);
+    g_task_interval = state.range(2);
+    g_start_time = time_get_os_boottime_us();
+    timer_->SchedulePeriodic(
+        message_loop_thread_->GetWeakPtr(), FROM_HERE,
+        base::Bind(&AlarmSleepAndCountDelayedTime, nullptr),
+        base::TimeDelta::FromMilliseconds(g_task_interval));
+    g_promise->get_future().get();
+    timer_->Cancel();
+  }
+  for (const auto& delay : g_map) {
+    state.counters[std::to_string(delay.first)] = delay.second;
+  }
+};
+
+BENCHMARK_REGISTER_F(BM_AlarmTaskPeriodicTimer, periodic_accuracy)
+    ->Args({2000, 1, 5})
+    ->Args({2000, 3, 5})
+    ->Args({2000, 1, 7})
+    ->Args({2000, 3, 7})
+    ->Args({2000, 1, 20})
+    ->Args({2000, 5, 20})
+    ->Args({2000, 10, 20})
+    ->Args({2000, 15, 20})
+    ->Iterations(1)
+    ->UseRealTime();
+
+int main(int argc, char** argv) {
+  // Disable LOG() output from libchrome
+  logging::LoggingSettings log_settings;
+  log_settings.logging_dest = logging::LoggingDestination::LOG_NONE;
+  CHECK(logging::InitLogging(log_settings)) << "Failed to set up logging";
+  ::benchmark::Initialize(&argc, argv);
+  if (::benchmark::ReportUnrecognizedArguments(argc, argv)) {
+    return 1;
+  }
+  ::benchmark::RunSpecifiedBenchmarks();
+}
index 9bf25be..f57ce46 100644 (file)
@@ -34,7 +34,8 @@ MessageLoopThread::MessageLoopThread(const std::string& thread_name)
       run_loop_(nullptr),
       thread_(nullptr),
       thread_id_(-1),
-      linux_tid_(-1) {}
+      linux_tid_(-1),
+      weak_ptr_factory_(this) {}
 
 MessageLoopThread::~MessageLoopThread() {
   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
@@ -58,13 +59,20 @@ void MessageLoopThread::StartUp() {
 
 bool MessageLoopThread::DoInThread(const tracked_objects::Location& from_here,
                                    base::OnceClosure task) {
+  return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
+}
+
+bool MessageLoopThread::DoInThreadDelayed(
+    const tracked_objects::Location& from_here, base::OnceClosure task,
+    const base::TimeDelta& delay) {
   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
   if (message_loop_ == nullptr) {
     LOG(ERROR) << __func__ << ": message loop is null for thread " << *this
                << ", from " << from_here.ToString();
     return false;
   }
-  if (!message_loop_->task_runner()->PostTask(from_here, std::move(task))) {
+  if (!message_loop_->task_runner()->PostDelayedTask(from_here, std::move(task),
+                                                     delay)) {
     LOG(ERROR) << __func__
                << ": failed to post task to message loop for thread " << *this
                << ", from " << from_here.ToString();
@@ -145,6 +153,11 @@ bool MessageLoopThread::EnableRealTimeScheduling() {
   return true;
 }
 
+base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  return weak_ptr_factory_.GetWeakPtr();
+}
+
 // Non API method, should NOT be protected by API mutex to avoid deadlock
 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
   LOG(INFO) << __func__ << ": message loop starting for thread "
index c342be4..4db11bb 100644 (file)
@@ -122,7 +122,13 @@ class MessageLoopThread final {
   bool EnableRealTimeScheduling();
 
   /**
-   * Return the mssage loop for this thread. Accessing raw message loop is not
+   * Return the weak pointer to this object. This can be useful when posting
+   * delayed tasks to this MessageLoopThread using Timer.
+   */
+  base::WeakPtr<MessageLoopThread> GetWeakPtr();
+
+  /**
+   * Return the message loop for this thread. Accessing raw message loop is not
    * recommended as message loop can be freed internally.
    *
    * @return message loop associated with this thread, nullptr if thread is not
@@ -144,6 +150,36 @@ class MessageLoopThread final {
                         std::promise<void> start_up_promise);
 
   /**
+   * Post a task to run on this thread after a specified delay. If the task
+   * needs to be cancelable before it's run, use base::CancelableClosure type
+   * for task closure. For example:
+   * <code>
+   * base::CancelableClosure cancelable_task;
+   * cancelable_task.Reset(base::Bind(...)); // bind the task
+   * same_thread->DoInThreadDelayed(FROM_HERE,
+   *                                cancelable_task.callback(), delay);
+   * ...
+   * // Cancel the task closure
+   * same_thread->DoInThread(FROM_HERE,
+   *                         base::Bind(&base::CancelableClosure::Cancel,
+   *                                    base::Unretained(&cancelable_task)));
+   * </code>
+   *
+   * Warning: base::CancelableClosure objects must be created on, posted to,
+   * cancelled on, and destroyed on the same thread.
+   *
+   * @param from_here location where this task is originated
+   * @param task task created through base::Bind()
+   * @param delay delay for the task to be executed
+   * @return true if task is successfully scheduled, false if task cannot be
+   * scheduled
+   */
+  bool DoInThreadDelayed(const tracked_objects::Location& from_here,
+                         base::OnceClosure task, const base::TimeDelta& delay);
+
+  friend class Timer;  // allow Timer to use DoInThreadDelayed()
+
+  /**
    * Actual method to run the thread, blocking until ShutDown() is called
    *
    * @param start_up_promise a std::promise that is used to notify calling
@@ -159,6 +195,7 @@ class MessageLoopThread final {
   base::PlatformThreadId thread_id_;
   // Linux specific abstractions
   pid_t linux_tid_;
+  base::WeakPtrFactory<MessageLoopThread> weak_ptr_factory_;
 
   DISALLOW_COPY_AND_ASSIGN(MessageLoopThread);
 };
index 0924ebe..67a0cfd 100644 (file)
@@ -61,6 +61,11 @@ class MessageLoopThreadTest : public ::testing::Test {
     execution_promise.set_value();
   }
 
+  void SleepAndGetName(std::promise<std::string> name_promise, int sleep_ms) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+    GetName(std::move(name_promise));
+  }
+
  protected:
   static bool CanSetCurrentThreadPriority() {
     struct __user_cap_header_struct linux_user_header = {
@@ -78,6 +83,16 @@ class MessageLoopThreadTest : public ::testing::Test {
   }
 };
 
+TEST_F(MessageLoopThreadTest, get_weak_ptr) {
+  base::WeakPtr<MessageLoopThread> message_loop_thread_ptr;
+  {
+    MessageLoopThread message_loop_thread("test_thread");
+    message_loop_thread_ptr = message_loop_thread.GetWeakPtr();
+    ASSERT_NE(message_loop_thread_ptr, nullptr);
+  }
+  ASSERT_EQ(message_loop_thread_ptr, nullptr);
+}
+
 TEST_F(MessageLoopThreadTest, test_running_thread) {
   MessageLoopThread message_loop_thread("test_thread");
   message_loop_thread.StartUp();
@@ -248,3 +263,20 @@ TEST_F(MessageLoopThreadTest, test_to_string_method) {
   ASSERT_STREQ(thread_string_after_shutdown.c_str(),
                thread_string_before_start.c_str());
 }
+
+// Verify the message loop thread will shutdown after callback finishes
+TEST_F(MessageLoopThreadTest, shut_down_while_in_callback) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  std::promise<std::string> name_promise;
+  std::future<std::string> name_future = name_promise.get_future();
+  uint32_t delay_ms = 5;
+  message_loop_thread.DoInThread(
+      FROM_HERE, base::BindOnce(&MessageLoopThreadTest::SleepAndGetName,
+                                base::Unretained(this), std::move(name_promise),
+                                delay_ms));
+  message_loop_thread.ShutDown();
+  std::string my_name = name_future.get();
+  ASSERT_EQ(name, my_name);
+}
diff --git a/common/timer.cc b/common/timer.cc
new file mode 100644 (file)
index 0000000..b4d126c
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "timer.h"
+#include "message_loop_thread.h"
+#include "time_util.h"
+
+namespace bluetooth {
+
+namespace common {
+
+constexpr base::TimeDelta kMinimumPeriod = base::TimeDelta::FromMicroseconds(1);
+
+Timer::~Timer() {
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  if (message_loop_thread_ != nullptr && message_loop_thread_->IsRunning()) {
+    CancelAndWait();
+  }
+}
+
+// This runs on user thread
+bool Timer::Schedule(const base::WeakPtr<MessageLoopThread>& thread,
+                     const tracked_objects::Location& from_here,
+                     base::Closure task, base::TimeDelta delay) {
+  return ScheduleTaskHelper(thread, from_here, std::move(task), delay, false);
+}
+
+// This runs on user thread
+bool Timer::SchedulePeriodic(const base::WeakPtr<MessageLoopThread>& thread,
+                             const tracked_objects::Location& from_here,
+                             base::Closure task, base::TimeDelta period) {
+  if (period < kMinimumPeriod) {
+    LOG(ERROR) << __func__ << ": period must be at least " << kMinimumPeriod;
+    return false;
+  }
+  return ScheduleTaskHelper(thread, from_here, std::move(task), period, true);
+}
+
+// This runs on user thread
+bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread,
+                               const tracked_objects::Location& from_here,
+                               base::Closure task, base::TimeDelta delay,
+                               bool is_periodic) {
+  uint64_t time_now_us = time_get_os_boottime_us();
+  uint64_t time_next_task_us = time_now_us + delay.InMicroseconds();
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  if (thread == nullptr) {
+    LOG(ERROR) << __func__ << ": thread must be non-null";
+    return false;
+  }
+  CancelAndWait();
+  expected_time_next_task_us_ = time_next_task_us;
+  task_ = std::move(task);
+  uint64_t time_until_next_us = time_next_task_us - time_get_os_boottime_us();
+  if (!thread->DoInThreadDelayed(
+          from_here, task_wrapper_,
+          base::TimeDelta::FromMicroseconds(time_until_next_us))) {
+    LOG(ERROR) << __func__
+               << ": failed to post task to message loop for thread " << *thread
+               << ", from " << from_here.ToString();
+    expected_time_next_task_us_ = 0;
+    task_.Reset();
+    return false;
+  }
+  message_loop_thread_ = thread;
+  period_ = delay;
+  is_periodic_ = is_periodic;
+  return true;
+}
+
+// This runs on user thread
+void Timer::Cancel() {
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  CancelHelper(false);
+}
+
+// This runs on user thread
+void Timer::CancelAndWait() {
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  CancelHelper(true);
+}
+
+// This runs on user thread
+void Timer::CancelHelper(bool is_synchronous) {
+  if (message_loop_thread_ == nullptr) {
+    return;
+  }
+  std::promise<void> promise;
+  auto future = promise.get_future();
+  if (message_loop_thread_->GetThreadId() ==
+      base::PlatformThread::CurrentId()) {
+    CancelClosure(std::move(promise));
+    return;
+  }
+  message_loop_thread_->DoInThread(
+      FROM_HERE, base::BindOnce(&Timer::CancelClosure, base::Unretained(this),
+                                std::move(promise)));
+  if (is_synchronous) {
+    future.wait();
+  }
+}
+
+// This runs on message loop thread
+void Timer::CancelClosure(std::promise<void> promise) {
+  message_loop_thread_ = nullptr;
+  task_.Reset();
+  period_ = base::TimeDelta();
+  is_periodic_ = false;
+  expected_time_next_task_us_ = 0;
+  promise.set_value();
+}
+
+// This runs in user thread
+bool Timer::IsScheduled() const {
+  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
+  return message_loop_thread_ != nullptr && message_loop_thread_->IsRunning();
+}
+
+// This runs in message loop thread
+void Timer::RunTask() {
+  if (message_loop_thread_ == nullptr || !message_loop_thread_->IsRunning()) {
+    LOG(ERROR) << __func__
+               << ": message_loop_thread_ is null or is not running";
+    return;
+  }
+  if (is_periodic_) {
+    int64_t period_us = period_.InMicroseconds();
+    expected_time_next_task_us_ += period_us;
+    uint64_t time_now_us = time_get_os_boottime_us();
+    int64_t remaining_time_us = expected_time_next_task_us_ - time_now_us;
+    if (remaining_time_us < 0) {
+      // if remaining_time_us is negative, schedule the task to the nearest
+      // multiple of period
+      remaining_time_us =
+          (remaining_time_us % period_us + period_us) % period_us;
+    }
+    message_loop_thread_->DoInThreadDelayed(
+        FROM_HERE, task_wrapper_,
+        base::TimeDelta::FromMicroseconds(remaining_time_us));
+  }
+  uint64_t time_before_task_us = time_get_os_boottime_us();
+  task_.Run();
+  uint64_t time_after_task_us = time_get_os_boottime_us();
+  int64_t task_time_us =
+      static_cast<int64_t>(time_after_task_us - time_before_task_us);
+  if (is_periodic_ && task_time_us > period_.InMicroseconds()) {
+    LOG(ERROR) << __func__ << ": Periodic task execution took " << task_time_us
+               << " microseconds, longer than interval "
+               << period_.InMicroseconds() << " microseconds";
+  }
+  if (!is_periodic_) {
+    message_loop_thread_ = nullptr;
+    task_.Reset();
+    period_ = base::TimeDelta();
+    expected_time_next_task_us_ = 0;
+  }
+}
+
+}  // namespace common
+
+}  // namespace bluetooth
diff --git a/common/timer.h b/common/timer.h
new file mode 100644 (file)
index 0000000..e8532a5
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <base/bind.h>
+#include <base/cancelable_callback.h>
+#include <base/tracked_objects.h>
+#include <future>
+
+namespace bluetooth {
+
+namespace common {
+
+class MessageLoopThread;
+
+/**
+ * An alarm clock that posts a delayed task to a specified MessageLoopThread
+ * once or periodically.
+ *
+ * Warning: MessageLoopThread must be running when any task is scheduled or
+ * being executed
+ */
+class Timer final {
+ public:
+  Timer()
+      : task_wrapper_(base::Bind(&Timer::RunTask, base::Unretained(this))),
+        is_periodic_(false),
+        expected_time_next_task_us_(0) {}
+  ~Timer();
+  Timer(const Timer&) = delete;
+  Timer& operator=(const Timer&) = delete;
+
+  /**
+   * Schedule a delayed task to the MessageLoopThread. Only one task can be
+   * scheduled at a time. If another task is scheduled, it will cancel the
+   * previous task synchronously and schedule the new task; this blocks until
+   * the previous task is cancelled.
+   *
+   * @param thread thread to run the task
+   * @param from_here location where this task is originated
+   * @param task task created through base::Bind()
+   * @param delay delay for the task to be executed
+   * @return true iff task is scheduled successfully
+   */
+  bool Schedule(const base::WeakPtr<MessageLoopThread>& thread,
+                const tracked_objects::Location& from_here, base::Closure task,
+                base::TimeDelta delay);
+
+  /**
+   * Schedule a delayed periodic task to the MessageLoopThread. Only one task
+   * can be scheduled at a time. If another task is scheduled, it will cancel
+   * the previous task synchronously and schedule the new periodic task; this
+   * blocks until the previous task is cancelled.
+   *
+   * @param thread thread to run the task
+   * @param from_here location where this task is originated
+   * @param task task created through base::Bind()
+   * @param period period for the task to be executed
+   * @return true iff task is scheduled successfully
+   */
+  bool SchedulePeriodic(const base::WeakPtr<MessageLoopThread>& thread,
+                        const tracked_objects::Location& from_here,
+                        base::Closure task, base::TimeDelta period);
+
+  /**
+   * Post an event which cancels the current task asynchronously
+   */
+  void Cancel();
+
+  /**
+   * Post an event which cancels the current task and wait for the cancellation
+   * to be completed
+   */
+  void CancelAndWait();
+
+  /**
+   * Returns true when there is a pending task scheduled on a running thread,
+   * otherwise false.
+   */
+  bool IsScheduled() const;
+
+ private:
+  base::WeakPtr<MessageLoopThread> message_loop_thread_;
+  const base::Closure task_wrapper_;
+  base::Closure task_;
+  base::TimeDelta period_;
+  bool is_periodic_;
+  uint64_t expected_time_next_task_us_;  // Using clock boot time in time_util.h
+  mutable std::recursive_mutex api_mutex_;
+  bool ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread,
+                          const tracked_objects::Location& from_here,
+                          base::Closure task, base::TimeDelta delay,
+                          bool is_periodic);
+  void CancelHelper(bool is_synchronous);
+  void CancelClosure(std::promise<void> promise);
+
+  /**
+   * Wraps a task. It posts another same task if the scheduled task is periodic.
+   */
+  void RunTask();
+};
+
+}  // namespace common
+
+}  // namespace bluetooth
diff --git a/common/timer_unittest.cc b/common/timer_unittest.cc
new file mode 100644 (file)
index 0000000..c6feea9
--- /dev/null
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <base/bind.h>
+#include <base/logging.h>
+#include <gtest/gtest.h>
+#include <future>
+
+#include "message_loop_thread.h"
+#include "timer.h"
+
+using bluetooth::common::MessageLoopThread;
+using bluetooth::common::Timer;
+
+// Allowed error between the expected and actual delay for DoInThreadDelayed().
+constexpr uint32_t delay_error_ms = 3;
+
+/**
+ * Unit tests to verify Task Scheduler.
+ */
+class TimerTest : public ::testing::Test {
+ public:
+  void ShouldNotHappen() { FAIL() << "Should not happen"; }
+
+  void IncreaseTaskCounter(int scheduled_tasks, std::promise<void>* promise) {
+    counter_++;
+    if (counter_ == scheduled_tasks) {
+      promise->set_value();
+    }
+  }
+
+  void GetName(std::string* name, std::promise<void>* promise) {
+    char my_name[256];
+    pthread_getname_np(pthread_self(), my_name, sizeof(my_name));
+    name->append(my_name);
+    promise->set_value();
+  }
+
+  void SleepAndGetName(std::string* name, std::promise<void>* name_promise,
+                       int sleep_ms) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+    GetName(name, name_promise);
+  }
+
+  void VerifyDelayTimeAndSleep(std::chrono::steady_clock::time_point start_time,
+                               int interval_ms, int scheduled_tasks,
+                               int task_length_ms,
+                               std::promise<void>* promise) {
+    auto end_time = std::chrono::steady_clock::now();
+    auto actual_delay = std::chrono::duration_cast<std::chrono::milliseconds>(
+        end_time - start_time);
+    counter_++;
+    int64_t scheduled_delay_ms = interval_ms * counter_;
+    if (counter_ >= scheduled_tasks) {
+      promise->set_value();
+    }
+    ASSERT_NEAR(scheduled_delay_ms, actual_delay.count(), delay_error_ms);
+    std::this_thread::sleep_for(std::chrono::milliseconds(task_length_ms));
+  }
+
+  void VerifyMultipleDelayedTasks(int scheduled_tasks, int task_length_ms,
+                                  int interval_between_tasks_ms) {
+    std::string name = "test_thread";
+    MessageLoopThread message_loop_thread(name);
+    message_loop_thread.StartUp();
+    message_loop_thread.EnableRealTimeScheduling();
+    auto future = promise_->get_future();
+    auto start_time = std::chrono::steady_clock::now();
+    timer_->SchedulePeriodic(
+        message_loop_thread.GetWeakPtr(), FROM_HERE,
+        base::Bind(&TimerTest::VerifyDelayTimeAndSleep, base::Unretained(this),
+                   start_time, interval_between_tasks_ms, scheduled_tasks,
+                   task_length_ms, promise_),
+        base::TimeDelta::FromMilliseconds(interval_between_tasks_ms));
+    future.get();
+    timer_->CancelAndWait();
+  }
+
+  void CancelTimerAndWait() { timer_->CancelAndWait(); }
+
+ protected:
+  void SetUp() override {
+    ::testing::Test::SetUp();
+    counter_ = 0;
+    timer_ = new Timer();
+    promise_ = new std::promise<void>();
+  }
+
+  void TearDown() override {
+    if (promise_ != nullptr) {
+      delete promise_;
+      promise_ = nullptr;
+    }
+    if (timer_ != nullptr) {
+      delete timer_;
+      timer_ = nullptr;
+    }
+  }
+
+  int counter_;
+  Timer* timer_;
+  std::promise<void>* promise_;
+};
+
+TEST_F(TimerTest, initial_is_not_scheduled) {
+  ASSERT_FALSE(timer_->IsScheduled());
+}
+
+TEST_F(TimerTest, schedule_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  auto future = promise_->get_future();
+  std::string my_name;
+  uint32_t delay_ms = 5;
+
+  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
+                   base::Bind(&TimerTest::GetName, base::Unretained(this),
+                              &my_name, promise_),
+                   base::TimeDelta::FromMilliseconds(delay_ms));
+  EXPECT_TRUE(timer_->IsScheduled());
+  future.get();
+  ASSERT_EQ(name, my_name);
+  EXPECT_FALSE(timer_->IsScheduled());
+}
+
+TEST_F(TimerTest, cancel_without_scheduling) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+
+  EXPECT_FALSE(timer_->IsScheduled());
+  timer_->CancelAndWait();
+  EXPECT_FALSE(timer_->IsScheduled());
+}
+
+TEST_F(TimerTest, cancel_in_callback_no_deadlock) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t delay_ms = 5;
+
+  timer_->Schedule(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::CancelTimerAndWait, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+}
+
+TEST_F(TimerTest, periodic_run) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  auto future = promise_->get_future();
+  uint32_t delay_ms = 5;
+  int num_tasks = 200;
+
+  timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::IncreaseTaskCounter, base::Unretained(this),
+                 num_tasks, promise_),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  future.get();
+  ASSERT_EQ(counter_, num_tasks);
+  timer_->CancelAndWait();
+}
+
+TEST_F(TimerTest, schedule_periodic_task_zero_interval) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t interval_ms = 0;
+
+  ASSERT_FALSE(timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(interval_ms)));
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms));
+}
+
+// Verify that deleting the timer without cancelling it will cancel the task
+TEST_F(TimerTest, periodic_delete_without_cancel) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t delay_ms = 5;
+  timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  delete timer_;
+  timer_ = nullptr;
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms));
+}
+
+TEST_F(TimerTest, cancel_single_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t delay_ms = 5;
+  uint32_t time_cancellation_ms = 3;
+  timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  std::this_thread::sleep_for(std::chrono::milliseconds(time_cancellation_ms));
+  timer_->Cancel();
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms));
+}
+
+TEST_F(TimerTest, cancel_periodic_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t delay_ms = 5;
+  uint32_t time_cancellation_ms = 3;
+  timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  std::this_thread::sleep_for(std::chrono::milliseconds(time_cancellation_ms));
+  timer_->CancelAndWait();
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms));
+}
+
+// Verify that if a task is being executed, then cancelling it is no-op
+TEST_F(TimerTest, cancel_current_task_no_effect) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  auto future = promise_->get_future();
+  std::string my_name;
+  uint32_t delay_ms = 5;
+
+  timer_->Schedule(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::SleepAndGetName, base::Unretained(this), &my_name,
+                 promise_, delay_ms),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  EXPECT_TRUE(timer_->IsScheduled());
+  std::this_thread::sleep_for(
+      std::chrono::milliseconds(delay_ms + delay_error_ms));
+  timer_->CancelAndWait();
+  future.get();
+  ASSERT_EQ(name, my_name);
+  EXPECT_FALSE(timer_->IsScheduled());
+}
+
+// Schedule 10 short periodic tasks with interval 1 ms between each; verify the
+// functionality
+TEST_F(TimerTest, schedule_multiple_delayed_tasks) {
+  VerifyMultipleDelayedTasks(10, 0, 1);
+}
+
+// Schedule 10 periodic tasks with interval 2 ms between each and each takes 1
+// ms; verify the functionality
+TEST_F(TimerTest, schedule_multiple_delayed_slow_tasks) {
+  VerifyMultipleDelayedTasks(10, 1, 2);
+}
+
+// Schedule 100 periodic tasks with interval 20 ms between each and each takes
+// 10 ms; verify the functionality
+TEST_F(TimerTest, schedule_multiple_delayed_slow_tasks_stress) {
+  VerifyMultipleDelayedTasks(100, 10, 20);
+}
+
+// Verify that when MessageLoopThread is shutdown, the pending task will be
+// cancelled
+TEST_F(TimerTest, message_loop_thread_down_cancel_pending_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  std::string my_name;
+  uint32_t delay_ms = 5;
+
+  timer_->Schedule(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  EXPECT_TRUE(timer_->IsScheduled());
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 3));
+  message_loop_thread.ShutDown();
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+}
+
+// Verify that when MessageLoopThread is shutdown, the pending periodic task
+// will be cancelled
+TEST_F(TimerTest, message_loop_thread_down_cancel_pending_periodic_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  uint32_t delay_ms = 5;
+
+  timer_->Schedule(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  EXPECT_TRUE(timer_->IsScheduled());
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 2));
+  message_loop_thread.ShutDown();
+  timer_->CancelAndWait();
+  EXPECT_FALSE(timer_->IsScheduled());
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+}
+
+TEST_F(TimerTest, schedule_task_cancel_previous_task) {
+  std::string name = "test_thread";
+  MessageLoopThread message_loop_thread(name);
+  message_loop_thread.StartUp();
+  std::string my_name;
+  auto future = promise_->get_future();
+  uint32_t delay_ms = 5;
+
+  timer_->SchedulePeriodic(
+      message_loop_thread.GetWeakPtr(), FROM_HERE,
+      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
+      base::TimeDelta::FromMilliseconds(delay_ms));
+  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 2));
+  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
+                   base::Bind(&TimerTest::GetName, base::Unretained(this),
+                              &my_name, promise_),
+                   base::TimeDelta::FromMilliseconds(delay_ms));
+  future.wait();
+  ASSERT_EQ(name, my_name);
+}
index da63388..af4b012 100755 (executable)
@@ -9,6 +9,7 @@
 
 known_benchmarks=(
   bluetooth_benchmark_thread_performance
+  bluetooth_benchmark_timer_performance
 )
 
 usage() {