OSDN Git Service

L2cap scheduler: Implement high priority channel
authorHansong Zhang <hsz@google.com>
Fri, 15 Jan 2021 01:55:26 +0000 (17:55 -0800)
committerHansong Zhang <hsz@google.com>
Fri, 15 Jan 2021 19:42:35 +0000 (11:42 -0800)
Add a high priority queue for these channels in scheduler.

Introduce MockQueue to help test with BidiQueueEnd callbacks.

Test: cert/run
Test: bluetooth_test_gd
Tag: #gd-refactor
Bug: 141555841
Change-Id: I3b60002f0d2ac2fb4d3307717dce128ccb06247c

gd/l2cap/internal/scheduler.h
gd/l2cap/internal/scheduler_fifo.cc
gd/l2cap/internal/scheduler_fifo.h
gd/l2cap/internal/scheduler_fifo_test.cc
gd/os/mock_queue.h [new file with mode: 0644]

index 416c972..dce2ec6 100644 (file)
@@ -54,6 +54,12 @@ class Scheduler {
    */
   virtual void OnPacketsReady(Cid cid, int number_packets) {}
 
+  /**
+   * Let the scheduler send the specified cid first.
+   * Used by A2dp software encoding.
+   */
+  virtual void SetChannelTxPriority(Cid cid, bool high_priority) {}
+
   virtual ~Scheduler() = default;
 };
 
index c70c401..62d94e6 100644 (file)
@@ -43,10 +43,20 @@ void Fifo::OnPacketsReady(Cid cid, int number_packets) {
   if (number_packets == 0) {
     return;
   }
-  next_to_dequeue_and_num_packets.push(std::make_pair(cid, number_packets));
+  int priority = high_priority_cids_.count(cid) != 0;
+  next_to_dequeue_and_num_packets.push(std::make_pair(cid, number_packets), priority);
   try_register_link_queue_enqueue();
 }
 
+// Invoked within L2CAP Handler context
+void Fifo::SetChannelTxPriority(Cid cid, bool high_priority) {
+  if (high_priority) {
+    high_priority_cids_.emplace(cid);
+  } else {
+    high_priority_cids_.erase(cid);
+  }
+}
+
 // Invoked from some external Queue Reactable context
 std::unique_ptr<Fifo::UpperDequeue> Fifo::link_queue_enqueue_callback() {
   ASSERT(!next_to_dequeue_and_num_packets.empty());
index 61128e9..6e77d9f 100644 (file)
 #include <atomic>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 
 #include "common/bidi_queue.h"
 #include "common/bind.h"
+#include "common/multi_priority_queue.h"
 #include "l2cap/cid.h"
 #include "l2cap/internal/channel_impl.h"
 #include "l2cap/internal/scheduler.h"
@@ -39,12 +41,15 @@ class Fifo : public Scheduler {
   Fifo(DataPipelineManager* data_pipeline_manager, LowerQueueUpEnd* link_queue_up_end, os::Handler* handler);
   ~Fifo();
   void OnPacketsReady(Cid cid, int number_packets) override;
+  void SetChannelTxPriority(Cid cid, bool high_priority) override;
 
  private:
   DataPipelineManager* data_pipeline_manager_;
   LowerQueueUpEnd* link_queue_up_end_;
   os::Handler* handler_;
-  std::queue<std::pair<Cid, int>> next_to_dequeue_and_num_packets;
+  using ChannelAndNumPackets = std::pair<Cid, int>;
+  common::MultiPriorityQueue<ChannelAndNumPackets, 2> next_to_dequeue_and_num_packets;
+  std::unordered_set<Cid> high_priority_cids_;
   std::atomic_bool link_queue_enqueue_registered_ = false;
 
   void try_register_link_queue_enqueue();
index cafd3b6..b45fdbf 100644 (file)
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
-#include <future>
 
 #include "l2cap/internal/channel_impl_mock.h"
 #include "l2cap/internal/data_controller_mock.h"
 #include "l2cap/internal/data_pipeline_manager_mock.h"
 #include "os/handler.h"
-#include "os/queue.h"
+#include "os/mock_queue.h"
 #include "os/thread.h"
 #include "packet/raw_builder.h"
 
@@ -50,67 +49,93 @@ PacketView<kLittleEndian> GetPacketView(std::unique_ptr<packet::BasePacketBuilde
   return packet::PacketView<packet::kLittleEndian>(bytes);
 }
 
-void sync_handler(os::Handler* handler) {
-  std::promise<void> promise;
-  auto future = promise.get_future();
-  handler->Post(common::BindOnce(&std::promise<void>::set_value, common::Unretained(&promise)));
-  auto status = future.wait_for(std::chrono::milliseconds(300));
-  EXPECT_EQ(status, std::future_status::ready);
-}
-
 class MyDataController : public testing::MockDataController {
  public:
   std::unique_ptr<BasePacketBuilder> GetNextPacket() override {
-    return std::move(next_packet);
+    auto next = std::move(next_packets.front());
+    next_packets.pop();
+    return next;
   }
 
-  std::unique_ptr<BasePacketBuilder> next_packet;
+  std::queue<std::unique_ptr<BasePacketBuilder>> next_packets;
 };
 
 class L2capSchedulerFifoTest : public ::testing::Test {
  protected:
   void SetUp() override {
     thread_ = new os::Thread("test_thread", os::Thread::Priority::NORMAL);
-    user_handler_ = new os::Handler(thread_);
     queue_handler_ = new os::Handler(thread_);
-    mock_data_pipeline_manager_ = new testing::MockDataPipelineManager(queue_handler_, link_queue_.GetUpEnd());
-    fifo_ = new Fifo(mock_data_pipeline_manager_, link_queue_.GetUpEnd(), queue_handler_);
+    mock_data_pipeline_manager_ = new testing::MockDataPipelineManager(queue_handler_, &queue_end_);
+    fifo_ = new Fifo(mock_data_pipeline_manager_, &queue_end_, queue_handler_);
   }
 
   void TearDown() override {
     delete fifo_;
     delete mock_data_pipeline_manager_;
     queue_handler_->Clear();
-    user_handler_->Clear();
     delete queue_handler_;
-    delete user_handler_;
     delete thread_;
   }
 
   os::Thread* thread_ = nullptr;
-  os::Handler* user_handler_ = nullptr;
   os::Handler* queue_handler_ = nullptr;
-  common::BidiQueue<Scheduler::LowerDequeue, Scheduler::LowerEnqueue> link_queue_{10};
+  os::MockIQueueDequeue<Scheduler::LowerDequeue> dequeue_;
+  os::MockIQueueEnqueue<Scheduler::LowerEnqueue> enqueue_;
+  common::BidiQueueEnd<Scheduler::LowerEnqueue, Scheduler::LowerDequeue> queue_end_{&enqueue_, &dequeue_};
   testing::MockDataPipelineManager* mock_data_pipeline_manager_ = nullptr;
-  MyDataController data_controller_;
+  MyDataController data_controller_1_;
+  MyDataController data_controller_2_;
   Fifo* fifo_ = nullptr;
 };
 
 TEST_F(L2capSchedulerFifoTest, send_packet) {
   auto frame = BasicFrameBuilder::Create(1, CreateSdu({'a', 'b', 'c'}));
-  data_controller_.next_packet = std::move(frame);
-  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(_)).WillOnce(Return(&data_controller_));
+  data_controller_1_.next_packets.push(std::move(frame));
+  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(_)).WillOnce(Return(&data_controller_1_));
   EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(1));
   fifo_->OnPacketsReady(1, 1);
-  sync_handler(queue_handler_);
-  sync_handler(user_handler_);
-  auto packet = link_queue_.GetDownEnd()->TryDequeue();
+  enqueue_.run_enqueue();
+  auto&& packet = enqueue_.enqueued.front();
   auto packet_view = GetPacketView(std::move(packet));
   auto basic_frame_view = BasicFrameView::Create(packet_view);
   EXPECT_TRUE(basic_frame_view.IsValid());
   EXPECT_EQ(basic_frame_view.GetChannelId(), 1);
   auto payload = basic_frame_view.GetPayload();
   EXPECT_EQ(std::string(payload.begin(), payload.end()), "abc");
+  enqueue_.enqueued.pop();
+}
+
+TEST_F(L2capSchedulerFifoTest, prioritize_channel) {
+  auto frame = BasicFrameBuilder::Create(1, CreateSdu({'a', 'b', 'c'}));
+  data_controller_1_.next_packets.push(std::move(frame));
+  frame = BasicFrameBuilder::Create(2, CreateSdu({'d', 'e', 'f'}));
+  data_controller_2_.next_packets.push(std::move(frame));
+
+  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(1)).WillRepeatedly(Return(&data_controller_1_));
+  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(2)).WillRepeatedly(Return(&data_controller_2_));
+  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(1));
+  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(2));
+  fifo_->SetChannelTxPriority(1, true);
+  fifo_->OnPacketsReady(2, 1);
+  fifo_->OnPacketsReady(1, 1);
+  enqueue_.run_enqueue(2);
+  auto packet1 = std::move(enqueue_.enqueued.front());
+  auto packet_view = GetPacketView(std::move(packet1));
+  auto basic_frame_view = BasicFrameView::Create(packet_view);
+  EXPECT_TRUE(basic_frame_view.IsValid());
+  EXPECT_EQ(basic_frame_view.GetChannelId(), 1);
+  auto payload = basic_frame_view.GetPayload();
+  EXPECT_EQ(std::string(payload.begin(), payload.end()), "abc");
+  enqueue_.enqueued.pop();
+
+  auto packet2 = std::move(enqueue_.enqueued.front());
+  packet_view = GetPacketView(std::move(packet2));
+  basic_frame_view = BasicFrameView::Create(packet_view);
+  EXPECT_TRUE(basic_frame_view.IsValid());
+  EXPECT_EQ(basic_frame_view.GetChannelId(), 2);
+  payload = basic_frame_view.GetPayload();
+  EXPECT_EQ(std::string(payload.begin(), payload.end()), "def");
+  enqueue_.enqueued.pop();
 }
 
 }  // namespace
diff --git a/gd/os/mock_queue.h b/gd/os/mock_queue.h
new file mode 100644 (file)
index 0000000..c024880
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unistd.h>
+
+#include <functional>
+#include <mutex>
+#include <queue>
+
+#include "common/bidi_queue.h"
+#include "common/bind.h"
+#include "common/callback.h"
+#include "os/handler.h"
+#include "os/log.h"
+#include "os/queue.h"
+
+namespace bluetooth {
+namespace os {
+
+template <typename T>
+class MockIQueueEnqueue : public IQueueEnqueue<T> {
+ public:
+  using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
+
+  virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
+    ASSERT(registered_handler == nullptr);
+    registered_handler = handler;
+    registered_enqueue_callback = callback;
+  }
+
+  virtual void UnregisterEnqueue() {
+    ASSERT(registered_handler != nullptr);
+    registered_handler = nullptr;
+    registered_enqueue_callback = {};
+  }
+
+  void run_enqueue(unsigned times = 1) {
+    while (registered_handler != nullptr && times > 0) {
+      times--;
+      enqueued.push(registered_enqueue_callback.Run());
+    }
+  }
+
+  Handler* registered_handler = nullptr;
+  EnqueueCallback registered_enqueue_callback = {};
+  std::queue<std::unique_ptr<T>> enqueued = {};
+};
+
+template <typename T>
+class MockIQueueDequeue : public IQueueDequeue<T> {
+ public:
+  using DequeueCallback = common::Callback<void()>;
+
+  virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) {
+    ASSERT(registered_handler == nullptr);
+    registered_handler = handler;
+    registered_dequeue_callback = callback;
+  }
+
+  virtual void UnregisterDequeue() {
+    ASSERT(registered_handler != nullptr);
+    registered_handler = nullptr;
+    registered_dequeue_callback = {};
+  }
+
+  virtual std::unique_ptr<T> TryDequeue() {
+    std::unique_ptr<T> front = std::move(enqueued.front());
+    enqueued.pop();
+    return front;
+  }
+
+  void run_dequeue(unsigned times = 1) {
+    while (registered_handler != nullptr && times > 0) {
+      times--;
+      registered_dequeue_callback.Run();
+    }
+  }
+
+  Handler* registered_handler = nullptr;
+  DequeueCallback registered_dequeue_callback = {};
+  std::queue<std::unique_ptr<T>> enqueued = {};
+};
+
+}  // namespace os
+}  // namespace bluetooth