From 48294531bf6e5a5db94766ae52a83454154cb3a1 Mon Sep 17 00:00:00 2001 From: Ajay Panicker Date: Fri, 28 Apr 2017 15:26:38 -0700 Subject: [PATCH] Move hci_thread to a message loop and prevent thread from spinning Before this patch, if there was a message on the command queue and there were no command credits, the thread reactor would spin trying to process the message on the command queue and would continue until a credit was received. This led to a bug where upon switching users, hci_thread would spin and try to use 100% of the CPU. This is fixed by moving over to a message loop and queue system. The message loop processes all the messages. If there aren't enough command credits, command messages are deferred to the command queue and popped off whenever more credits are aquired. The deferred queue has priority to credits over recently posted messages. Bug: 37733903 Test: Swap users with the real time scheduling patch applied, and general Bluetooth usage. TestTracker: 86249 Change-Id: Ib775e47f6d4810d3d7d8af5b3ba84adc4ada3da5 (cherry picked from commit ffee0ee8a102101ded1d3fa398f20c4215e0c441) --- hci/src/hci_layer.cc | 131 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 77 insertions(+), 54 deletions(-) diff --git a/hci/src/hci_layer.cc b/hci/src/hci_layer.cc index e3223acde..cb6cd2922 100644 --- a/hci/src/hci_layer.cc +++ b/hci/src/hci_layer.cc @@ -20,7 +20,12 @@ #include "hci_layer.h" +#include #include +#include +#include +#include + #include #include #include @@ -80,13 +85,15 @@ static const packet_fragmenter_t* packet_fragmenter; static future_t* startup_future; static thread_t* thread; // We own this +static base::MessageLoop* message_loop_ = nullptr; +static base::RunLoop* run_loop_ = nullptr; static alarm_t* startup_timer; // Outbound-related static int command_credits = 1; -static fixed_queue_t* command_queue; -static fixed_queue_t* packet_queue; +static std::mutex command_credits_mutex; +static std::queue command_queue; // Inbound-related static alarm_t* command_response_timer; @@ -102,8 +109,10 @@ static waiting_command_t* get_waiting_command(command_opcode_t opcode); static void event_finish_startup(void* context); static void startup_timer_expired(void* context); -static void event_command_ready(fixed_queue_t* queue, void* context); -static void event_packet_ready(fixed_queue_t* queue, void* context); +static void enqueue_command(waiting_command_t* wait_entry); +static void event_command_ready(waiting_command_t* wait_entry); +static void enqueue_packet(void* packet); +static void event_packet_ready(void* packet); static void command_timed_out(void* context); static void update_command_response_timer(void); @@ -117,7 +126,8 @@ static const packet_fragmenter_callbacks_t packet_fragmenter_callbacks = { transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished}; void initialization_complete() { - thread_post(thread, event_finish_startup, NULL); + message_loop_->task_runner()->PostTask( + FROM_HERE, base::Bind(&event_finish_startup, nullptr)); } void hci_event_received(BT_HDR* packet) { @@ -143,6 +153,21 @@ void sco_data_received(BT_HDR* packet) { static future_t* hci_module_shut_down(); +void message_loop_run(UNUSED_ATTR void* context) { + message_loop_ = new base::MessageLoop(); + run_loop_ = new base::RunLoop(); + + message_loop_->task_runner()->PostTask(FROM_HERE, + base::Bind(&hci_initialize)); + run_loop_->Run(); + + delete message_loop_; + message_loop_ = nullptr; + + delete run_loop_; + run_loop_ = nullptr; +} + static future_t* hci_module_start_up(void) { LOG_INFO(LOG_TAG, "%s", __func__); @@ -174,18 +199,6 @@ static future_t* hci_module_start_up(void) { goto error; } - command_queue = fixed_queue_new(SIZE_MAX); - if (!command_queue) { - LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__); - goto error; - } - - packet_queue = fixed_queue_new(SIZE_MAX); - if (!packet_queue) { - LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__); - goto error; - } - thread = thread_new("hci_thread"); if (!thread) { LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__); @@ -211,12 +224,7 @@ static future_t* hci_module_start_up(void) { packet_fragmenter->init(&packet_fragmenter_callbacks); - fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread), - event_command_ready, NULL); - fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread), - event_packet_ready, NULL); - - hci_initialize(); + thread_post(thread, message_loop_run, NULL); LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__); return local_startup_future; @@ -238,6 +246,8 @@ static future_t* hci_module_shut_down() { startup_timer = NULL; } + message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure()); + // Stop the thread to prevent Send() calls. if (thread) { thread_stop(thread); @@ -247,10 +257,6 @@ static future_t* hci_module_shut_down() { // Close HCI to prevent callbacks. hci_close(); - fixed_queue_free(command_queue, osi_free); - command_queue = NULL; - fixed_queue_free(packet_queue, buffer_allocator->free); - packet_queue = NULL; { std::lock_guard lock(commands_pending_response_mutex); list_free(commands_pending_response); @@ -294,7 +300,7 @@ static void transmit_command(BT_HDR* command, // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; - fixed_queue_enqueue(command_queue, wait_entry); + enqueue_command(wait_entry); } static future_t* transmit_command_futured(BT_HDR* command) { @@ -311,7 +317,7 @@ static future_t* transmit_command_futured(BT_HDR* command) { // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; - fixed_queue_enqueue(command_queue, wait_entry); + enqueue_command(wait_entry); return future; } @@ -323,7 +329,7 @@ static void transmit_downward(data_dispatcher_type_t type, void* data) { "%s legacy transmit of command. Use transmit_command instead.", __func__); } else { - fixed_queue_enqueue(packet_queue, data); + enqueue_packet(data); } } @@ -346,34 +352,38 @@ static void startup_timer_expired(UNUSED_ATTR void* context) { } // Command/packet transmitting functions +static void enqueue_command(waiting_command_t* wait_entry) { + base::Closure callback = base::Bind(&event_command_ready, wait_entry); -static void event_command_ready(fixed_queue_t* queue, - UNUSED_ATTR void* context) { + std::lock_guard lock(command_credits_mutex); if (command_credits > 0) { - waiting_command_t* wait_entry = - reinterpret_cast(fixed_queue_dequeue(queue)); + message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback)); command_credits--; + } else { + command_queue.push(std::move(callback)); + } +} - // Move it to the list of commands awaiting response - { - std::lock_guard lock( - commands_pending_response_mutex); - wait_entry->timestamp = std::chrono::steady_clock::now(); - list_append(commands_pending_response, wait_entry); +static void event_command_ready(waiting_command_t* wait_entry) { + /// Move it to the list of commands awaiting response + std::lock_guard lock(commands_pending_response_mutex); + wait_entry->timestamp = std::chrono::steady_clock::now(); + list_append(commands_pending_response, wait_entry); - // Send it off - packet_fragmenter->fragment_and_dispatch(wait_entry->command); + // Send it off + packet_fragmenter->fragment_and_dispatch(wait_entry->command); - update_command_response_timer(); - } - } + update_command_response_timer(); } -static void event_packet_ready(fixed_queue_t* queue, - UNUSED_ATTR void* context) { - // The queue may be the command queue or the packet queue, we don't care - BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue); +static void enqueue_packet(void* packet) { + message_loop_->task_runner()->PostTask( + FROM_HERE, base::Bind(&event_packet_ready, packet)); +} +static void event_packet_ready(void* pkt) { + // The queue may be the command queue or the packet queue, we don't care + BT_HDR* packet = (BT_HDR*)pkt; packet_fragmenter->fragment_and_dispatch(packet); } @@ -428,6 +438,17 @@ static void command_timed_out(UNUSED_ATTR void* context) { } // Event/packet receiving functions +void process_command_credits(int credits) { + std::lock_guard lock(command_credits_mutex); + + command_credits = credits; + while (command_credits > 0 && command_queue.size() > 0) { + message_loop_->task_runner()->PostTask(FROM_HERE, + std::move(command_queue.front())); + command_queue.pop(); + command_credits--; + } +} // Returns true if the event was intercepted and should not proceed to // higher layers. Also inspects an incoming event for interesting @@ -436,19 +457,20 @@ static bool filter_incoming_event(BT_HDR* packet) { waiting_command_t* wait_entry = NULL; uint8_t* stream = packet->data; uint8_t event_code; + int credits = 0; command_opcode_t opcode; STREAM_TO_UINT8(event_code, stream); STREAM_SKIP_UINT8(stream); // Skip the parameter total length field if (event_code == HCI_COMMAND_COMPLETE_EVT) { - STREAM_TO_UINT8(command_credits, stream); + STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); + process_command_credits(credits); + wait_entry = get_waiting_command(opcode); if (!wait_entry) { - // TODO: Currently command_credits aren't parsed at all; here or in higher - // layers... if (opcode != HCI_COMMAND_NONE) { LOG_WARN(LOG_TAG, "%s command complete event with no matching command (opcode: " @@ -468,12 +490,13 @@ static bool filter_incoming_event(BT_HDR* packet) { } else if (event_code == HCI_COMMAND_STATUS_EVT) { uint8_t status; STREAM_TO_UINT8(status, stream); - STREAM_TO_UINT8(command_credits, stream); + STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); + process_command_credits(credits); + // If a command generates a command status event, it won't be getting a // command complete event - wait_entry = get_waiting_command(opcode); if (!wait_entry) { LOG_WARN( -- 2.11.0