#include "hci_layer.h"
+#include <base/bind.h>
#include <base/logging.h>
+#include <base/run_loop.h>
+#include <base/sequenced_task_runner.h>
+#include <base/threading/thread.h>
+
#include <signal.h>
#include <string.h>
#include <sys/types.h>
static future_t* startup_future;
static thread_t* thread; // We own this
+static base::MessageLoop* message_loop_ = nullptr;
+static base::RunLoop* run_loop_ = nullptr;
static alarm_t* startup_timer;
// Outbound-related
static int command_credits = 1;
-static fixed_queue_t* command_queue;
-static fixed_queue_t* packet_queue;
+static std::mutex command_credits_mutex;
+static std::queue<base::Closure> command_queue;
// Inbound-related
static alarm_t* command_response_timer;
static void event_finish_startup(void* context);
static void startup_timer_expired(void* context);
-static void event_command_ready(fixed_queue_t* queue, void* context);
-static void event_packet_ready(fixed_queue_t* queue, void* context);
+static void enqueue_command(waiting_command_t* wait_entry);
+static void event_command_ready(waiting_command_t* wait_entry);
+static void enqueue_packet(void* packet);
+static void event_packet_ready(void* packet);
static void command_timed_out(void* context);
static void update_command_response_timer(void);
transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished};
void initialization_complete() {
- thread_post(thread, event_finish_startup, NULL);
+ message_loop_->task_runner()->PostTask(
+ FROM_HERE, base::Bind(&event_finish_startup, nullptr));
}
void hci_event_received(BT_HDR* packet) {
static future_t* hci_module_shut_down();
+void message_loop_run(UNUSED_ATTR void* context) {
+ message_loop_ = new base::MessageLoop();
+ run_loop_ = new base::RunLoop();
+
+ message_loop_->task_runner()->PostTask(FROM_HERE,
+ base::Bind(&hci_initialize));
+ run_loop_->Run();
+
+ delete message_loop_;
+ message_loop_ = nullptr;
+
+ delete run_loop_;
+ run_loop_ = nullptr;
+}
+
static future_t* hci_module_start_up(void) {
LOG_INFO(LOG_TAG, "%s", __func__);
goto error;
}
- command_queue = fixed_queue_new(SIZE_MAX);
- if (!command_queue) {
- LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__);
- goto error;
- }
-
- packet_queue = fixed_queue_new(SIZE_MAX);
- if (!packet_queue) {
- LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__);
- goto error;
- }
-
thread = thread_new("hci_thread");
if (!thread) {
LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__);
packet_fragmenter->init(&packet_fragmenter_callbacks);
- fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread),
- event_command_ready, NULL);
- fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread),
- event_packet_ready, NULL);
-
- hci_initialize();
+ thread_post(thread, message_loop_run, NULL);
LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__);
return local_startup_future;
startup_timer = NULL;
}
+ message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure());
+
// Stop the thread to prevent Send() calls.
if (thread) {
thread_stop(thread);
// Close HCI to prevent callbacks.
hci_close();
- fixed_queue_free(command_queue, osi_free);
- command_queue = NULL;
- fixed_queue_free(packet_queue, buffer_allocator->free);
- packet_queue = NULL;
{
std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
list_free(commands_pending_response);
// in case the upper layer didn't already
command->event = MSG_STACK_TO_HC_HCI_CMD;
- fixed_queue_enqueue(command_queue, wait_entry);
+ enqueue_command(wait_entry);
}
static future_t* transmit_command_futured(BT_HDR* command) {
// in case the upper layer didn't already
command->event = MSG_STACK_TO_HC_HCI_CMD;
- fixed_queue_enqueue(command_queue, wait_entry);
+ enqueue_command(wait_entry);
return future;
}
"%s legacy transmit of command. Use transmit_command instead.",
__func__);
} else {
- fixed_queue_enqueue(packet_queue, data);
+ enqueue_packet(data);
}
}
}
// Command/packet transmitting functions
+static void enqueue_command(waiting_command_t* wait_entry) {
+ base::Closure callback = base::Bind(&event_command_ready, wait_entry);
-static void event_command_ready(fixed_queue_t* queue,
- UNUSED_ATTR void* context) {
+ std::lock_guard<std::mutex> lock(command_credits_mutex);
if (command_credits > 0) {
- waiting_command_t* wait_entry =
- reinterpret_cast<waiting_command_t*>(fixed_queue_dequeue(queue));
+ message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback));
command_credits--;
+ } else {
+ command_queue.push(std::move(callback));
+ }
+}
- // Move it to the list of commands awaiting response
- {
- std::lock_guard<std::recursive_mutex> lock(
- commands_pending_response_mutex);
- wait_entry->timestamp = std::chrono::steady_clock::now();
- list_append(commands_pending_response, wait_entry);
+static void event_command_ready(waiting_command_t* wait_entry) {
+ /// Move it to the list of commands awaiting response
+ std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
+ wait_entry->timestamp = std::chrono::steady_clock::now();
+ list_append(commands_pending_response, wait_entry);
- // Send it off
- packet_fragmenter->fragment_and_dispatch(wait_entry->command);
+ // Send it off
+ packet_fragmenter->fragment_and_dispatch(wait_entry->command);
- update_command_response_timer();
- }
- }
+ update_command_response_timer();
}
-static void event_packet_ready(fixed_queue_t* queue,
- UNUSED_ATTR void* context) {
- // The queue may be the command queue or the packet queue, we don't care
- BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue);
+static void enqueue_packet(void* packet) {
+ message_loop_->task_runner()->PostTask(
+ FROM_HERE, base::Bind(&event_packet_ready, packet));
+}
+static void event_packet_ready(void* pkt) {
+ // The queue may be the command queue or the packet queue, we don't care
+ BT_HDR* packet = (BT_HDR*)pkt;
packet_fragmenter->fragment_and_dispatch(packet);
}
}
// Event/packet receiving functions
+void process_command_credits(int credits) {
+ std::lock_guard<std::mutex> lock(command_credits_mutex);
+
+ command_credits = credits;
+ while (command_credits > 0 && command_queue.size() > 0) {
+ message_loop_->task_runner()->PostTask(FROM_HERE,
+ std::move(command_queue.front()));
+ command_queue.pop();
+ command_credits--;
+ }
+}
// Returns true if the event was intercepted and should not proceed to
// higher layers. Also inspects an incoming event for interesting
waiting_command_t* wait_entry = NULL;
uint8_t* stream = packet->data;
uint8_t event_code;
+ int credits = 0;
command_opcode_t opcode;
STREAM_TO_UINT8(event_code, stream);
STREAM_SKIP_UINT8(stream); // Skip the parameter total length field
if (event_code == HCI_COMMAND_COMPLETE_EVT) {
- STREAM_TO_UINT8(command_credits, stream);
+ STREAM_TO_UINT8(credits, stream);
STREAM_TO_UINT16(opcode, stream);
+ process_command_credits(credits);
+
wait_entry = get_waiting_command(opcode);
if (!wait_entry) {
- // TODO: Currently command_credits aren't parsed at all; here or in higher
- // layers...
if (opcode != HCI_COMMAND_NONE) {
LOG_WARN(LOG_TAG,
"%s command complete event with no matching command (opcode: "
} else if (event_code == HCI_COMMAND_STATUS_EVT) {
uint8_t status;
STREAM_TO_UINT8(status, stream);
- STREAM_TO_UINT8(command_credits, stream);
+ STREAM_TO_UINT8(credits, stream);
STREAM_TO_UINT16(opcode, stream);
+ process_command_credits(credits);
+
// If a command generates a command status event, it won't be getting a
// command complete event
-
wait_entry = get_waiting_command(opcode);
if (!wait_entry) {
LOG_WARN(