OSDN Git Service

Move hci_thread to a message loop and prevent thread from spinning
authorAjay Panicker <apanicke@google.com>
Fri, 28 Apr 2017 22:26:38 +0000 (15:26 -0700)
committerAjay Panicker <apanicke@google.com>
Fri, 5 May 2017 22:36:56 +0000 (22:36 +0000)
Before this patch, if there was a message on the command queue and
there were no command credits, the thread reactor would spin trying
to process the message on the command queue and would continue until
a credit was received. This led to a bug where upon switching users,
hci_thread would spin and try to use 100% of the CPU. This is fixed
by moving over to a message loop and queue system. The message loop
processes all the messages. If there aren't enough command credits,
command messages are deferred to the command queue and popped off
whenever more credits are aquired. The deferred queue has priority
to credits over recently posted messages.

Bug: 37733903
Test: Swap users with the real time scheduling patch applied, and
      general Bluetooth usage.
      TestTracker: 86249
Change-Id: Ib775e47f6d4810d3d7d8af5b3ba84adc4ada3da5
(cherry picked from commit ffee0ee8a102101ded1d3fa398f20c4215e0c441)

hci/src/hci_layer.cc

index e3223ac..cb6cd29 100644 (file)
 
 #include "hci_layer.h"
 
+#include <base/bind.h>
 #include <base/logging.h>
+#include <base/run_loop.h>
+#include <base/sequenced_task_runner.h>
+#include <base/threading/thread.h>
+
 #include <signal.h>
 #include <string.h>
 #include <sys/types.h>
@@ -80,13 +85,15 @@ static const packet_fragmenter_t* packet_fragmenter;
 
 static future_t* startup_future;
 static thread_t* thread;  // We own this
+static base::MessageLoop* message_loop_ = nullptr;
+static base::RunLoop* run_loop_ = nullptr;
 
 static alarm_t* startup_timer;
 
 // Outbound-related
 static int command_credits = 1;
-static fixed_queue_t* command_queue;
-static fixed_queue_t* packet_queue;
+static std::mutex command_credits_mutex;
+static std::queue<base::Closure> command_queue;
 
 // Inbound-related
 static alarm_t* command_response_timer;
@@ -102,8 +109,10 @@ static waiting_command_t* get_waiting_command(command_opcode_t opcode);
 static void event_finish_startup(void* context);
 static void startup_timer_expired(void* context);
 
-static void event_command_ready(fixed_queue_t* queue, void* context);
-static void event_packet_ready(fixed_queue_t* queue, void* context);
+static void enqueue_command(waiting_command_t* wait_entry);
+static void event_command_ready(waiting_command_t* wait_entry);
+static void enqueue_packet(void* packet);
+static void event_packet_ready(void* packet);
 static void command_timed_out(void* context);
 
 static void update_command_response_timer(void);
@@ -117,7 +126,8 @@ static const packet_fragmenter_callbacks_t packet_fragmenter_callbacks = {
     transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished};
 
 void initialization_complete() {
-  thread_post(thread, event_finish_startup, NULL);
+  message_loop_->task_runner()->PostTask(
+      FROM_HERE, base::Bind(&event_finish_startup, nullptr));
 }
 
 void hci_event_received(BT_HDR* packet) {
@@ -143,6 +153,21 @@ void sco_data_received(BT_HDR* packet) {
 
 static future_t* hci_module_shut_down();
 
+void message_loop_run(UNUSED_ATTR void* context) {
+  message_loop_ = new base::MessageLoop();
+  run_loop_ = new base::RunLoop();
+
+  message_loop_->task_runner()->PostTask(FROM_HERE,
+                                         base::Bind(&hci_initialize));
+  run_loop_->Run();
+
+  delete message_loop_;
+  message_loop_ = nullptr;
+
+  delete run_loop_;
+  run_loop_ = nullptr;
+}
+
 static future_t* hci_module_start_up(void) {
   LOG_INFO(LOG_TAG, "%s", __func__);
 
@@ -174,18 +199,6 @@ static future_t* hci_module_start_up(void) {
     goto error;
   }
 
-  command_queue = fixed_queue_new(SIZE_MAX);
-  if (!command_queue) {
-    LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__);
-    goto error;
-  }
-
-  packet_queue = fixed_queue_new(SIZE_MAX);
-  if (!packet_queue) {
-    LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__);
-    goto error;
-  }
-
   thread = thread_new("hci_thread");
   if (!thread) {
     LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__);
@@ -211,12 +224,7 @@ static future_t* hci_module_start_up(void) {
 
   packet_fragmenter->init(&packet_fragmenter_callbacks);
 
-  fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread),
-                               event_command_ready, NULL);
-  fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread),
-                               event_packet_ready, NULL);
-
-  hci_initialize();
+  thread_post(thread, message_loop_run, NULL);
 
   LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__);
   return local_startup_future;
@@ -238,6 +246,8 @@ static future_t* hci_module_shut_down() {
     startup_timer = NULL;
   }
 
+  message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure());
+
   // Stop the thread to prevent Send() calls.
   if (thread) {
     thread_stop(thread);
@@ -247,10 +257,6 @@ static future_t* hci_module_shut_down() {
   // Close HCI to prevent callbacks.
   hci_close();
 
-  fixed_queue_free(command_queue, osi_free);
-  command_queue = NULL;
-  fixed_queue_free(packet_queue, buffer_allocator->free);
-  packet_queue = NULL;
   {
     std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
     list_free(commands_pending_response);
@@ -294,7 +300,7 @@ static void transmit_command(BT_HDR* command,
   // in case the upper layer didn't already
   command->event = MSG_STACK_TO_HC_HCI_CMD;
 
-  fixed_queue_enqueue(command_queue, wait_entry);
+  enqueue_command(wait_entry);
 }
 
 static future_t* transmit_command_futured(BT_HDR* command) {
@@ -311,7 +317,7 @@ static future_t* transmit_command_futured(BT_HDR* command) {
   // in case the upper layer didn't already
   command->event = MSG_STACK_TO_HC_HCI_CMD;
 
-  fixed_queue_enqueue(command_queue, wait_entry);
+  enqueue_command(wait_entry);
   return future;
 }
 
@@ -323,7 +329,7 @@ static void transmit_downward(data_dispatcher_type_t type, void* data) {
              "%s legacy transmit of command. Use transmit_command instead.",
              __func__);
   } else {
-    fixed_queue_enqueue(packet_queue, data);
+    enqueue_packet(data);
   }
 }
 
@@ -346,34 +352,38 @@ static void startup_timer_expired(UNUSED_ATTR void* context) {
 }
 
 // Command/packet transmitting functions
+static void enqueue_command(waiting_command_t* wait_entry) {
+  base::Closure callback = base::Bind(&event_command_ready, wait_entry);
 
-static void event_command_ready(fixed_queue_t* queue,
-                                UNUSED_ATTR void* context) {
+  std::lock_guard<std::mutex> lock(command_credits_mutex);
   if (command_credits > 0) {
-    waiting_command_t* wait_entry =
-        reinterpret_cast<waiting_command_t*>(fixed_queue_dequeue(queue));
+    message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback));
     command_credits--;
+  } else {
+    command_queue.push(std::move(callback));
+  }
+}
 
-    // Move it to the list of commands awaiting response
-    {
-      std::lock_guard<std::recursive_mutex> lock(
-          commands_pending_response_mutex);
-      wait_entry->timestamp = std::chrono::steady_clock::now();
-      list_append(commands_pending_response, wait_entry);
+static void event_command_ready(waiting_command_t* wait_entry) {
+  /// Move it to the list of commands awaiting response
+  std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
+  wait_entry->timestamp = std::chrono::steady_clock::now();
+  list_append(commands_pending_response, wait_entry);
 
-      // Send it off
-      packet_fragmenter->fragment_and_dispatch(wait_entry->command);
+  // Send it off
+  packet_fragmenter->fragment_and_dispatch(wait_entry->command);
 
-      update_command_response_timer();
-    }
-  }
+  update_command_response_timer();
 }
 
-static void event_packet_ready(fixed_queue_t* queue,
-                               UNUSED_ATTR void* context) {
-  // The queue may be the command queue or the packet queue, we don't care
-  BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue);
+static void enqueue_packet(void* packet) {
+  message_loop_->task_runner()->PostTask(
+      FROM_HERE, base::Bind(&event_packet_ready, packet));
+}
 
+static void event_packet_ready(void* pkt) {
+  // The queue may be the command queue or the packet queue, we don't care
+  BT_HDR* packet = (BT_HDR*)pkt;
   packet_fragmenter->fragment_and_dispatch(packet);
 }
 
@@ -428,6 +438,17 @@ static void command_timed_out(UNUSED_ATTR void* context) {
 }
 
 // Event/packet receiving functions
+void process_command_credits(int credits) {
+  std::lock_guard<std::mutex> lock(command_credits_mutex);
+
+  command_credits = credits;
+  while (command_credits > 0 && command_queue.size() > 0) {
+    message_loop_->task_runner()->PostTask(FROM_HERE,
+                                           std::move(command_queue.front()));
+    command_queue.pop();
+    command_credits--;
+  }
+}
 
 // Returns true if the event was intercepted and should not proceed to
 // higher layers. Also inspects an incoming event for interesting
@@ -436,19 +457,20 @@ static bool filter_incoming_event(BT_HDR* packet) {
   waiting_command_t* wait_entry = NULL;
   uint8_t* stream = packet->data;
   uint8_t event_code;
+  int credits = 0;
   command_opcode_t opcode;
 
   STREAM_TO_UINT8(event_code, stream);
   STREAM_SKIP_UINT8(stream);  // Skip the parameter total length field
 
   if (event_code == HCI_COMMAND_COMPLETE_EVT) {
-    STREAM_TO_UINT8(command_credits, stream);
+    STREAM_TO_UINT8(credits, stream);
     STREAM_TO_UINT16(opcode, stream);
 
+    process_command_credits(credits);
+
     wait_entry = get_waiting_command(opcode);
     if (!wait_entry) {
-      // TODO: Currently command_credits aren't parsed at all; here or in higher
-      // layers...
       if (opcode != HCI_COMMAND_NONE) {
         LOG_WARN(LOG_TAG,
                  "%s command complete event with no matching command (opcode: "
@@ -468,12 +490,13 @@ static bool filter_incoming_event(BT_HDR* packet) {
   } else if (event_code == HCI_COMMAND_STATUS_EVT) {
     uint8_t status;
     STREAM_TO_UINT8(status, stream);
-    STREAM_TO_UINT8(command_credits, stream);
+    STREAM_TO_UINT8(credits, stream);
     STREAM_TO_UINT16(opcode, stream);
 
+    process_command_credits(credits);
+
     // If a command generates a command status event, it won't be getting a
     // command complete event
-
     wait_entry = get_waiting_command(opcode);
     if (!wait_entry) {
       LOG_WARN(