// Create and register a single-shot alarm on given thread
explicit Alarm(Thread* thread);
- // Create and register a single-shot alarm with a given reactor
- explicit Alarm(Reactor* reactor);
-
// Unregister this alarm from the thread and release resource
~Alarm();
private:
Closure task_;
- Reactor* reactor_;
+ Thread* thread_;
int fd_ = 0;
Reactor::Reactable* token_;
mutable std::mutex mutex_;
// Create and register a handler on given thread
explicit Handler(Thread* thread);
- // Create and register a handler with a given reactor
- explicit Handler(Reactor* reactor);
-
// Unregister this handler from the thread and release resource. Unhandled events will be discarded and not executed.
~Handler();
private:
std::queue<Closure> tasks_;
- Reactor* reactor_;
+ Thread* thread_;
int fd_;
Reactor::Reactable* reactable_;
mutable std::mutex mutex_;
namespace bluetooth {
namespace os {
-Alarm::Alarm(Reactor* reactor) : reactor_(reactor), fd_(timerfd_create(ALARM_CLOCK, 0)) {
+Alarm::Alarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) {
ASSERT_LOG(fd_ != -1, "cannot create timerfd: %s", strerror(errno));
- token_ = reactor_->Register(fd_, [this] { on_fire(); }, nullptr);
+ token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr);
}
-Alarm::Alarm(Thread* thread) : Alarm(thread->GetReactor()) {}
-
Alarm::~Alarm() {
- reactor_->Unregister(token_);
+ thread_->GetReactor()->Unregister(token_);
int close_status;
RUN_NO_INTR(close_status = close(fd_));
namespace bluetooth {
namespace os {
-Handler::Handler(Reactor* reactor) : reactor_(reactor), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) {
+Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) {
ASSERT(fd_ != -1);
- reactable_ = reactor_->Register(fd_, [this] { this->handle_next_event(); }, nullptr);
+ reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr);
}
-Handler::Handler(Thread* thread) : Handler(thread->GetReactor()) {}
-
Handler::~Handler() {
- reactor_->Unregister(reactable_);
+ thread_->GetReactor()->Unregister(reactable_);
reactable_ = nullptr;
int close_status;
ASSERT(enqueue_.handler_ == nullptr);
ASSERT(enqueue_.reactable_ == nullptr);
enqueue_.handler_ = handler;
- enqueue_.reactable_ = enqueue_.handler_->reactor_->Register(
+ enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
enqueue_.reactive_semaphore_.GetFd(), [this, callback] { EnqueueCallbackInternal(callback); }, nullptr);
}
void Queue<T>::UnregisterEnqueue() {
std::lock_guard<std::mutex> lock(mutex_);
ASSERT(enqueue_.reactable_ != nullptr);
- enqueue_.handler_->reactor_->Unregister(enqueue_.reactable_);
+ enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
enqueue_.reactable_ = nullptr;
enqueue_.handler_ = nullptr;
}
ASSERT(dequeue_.handler_ == nullptr);
ASSERT(dequeue_.reactable_ == nullptr);
dequeue_.handler_ = handler;
- dequeue_.reactable_ =
- dequeue_.handler_->reactor_->Register(dequeue_.reactive_semaphore_.GetFd(), [callback] { callback(); }, nullptr);
+ dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(),
+ [callback] { callback(); }, nullptr);
}
template <typename T>
void Queue<T>::UnregisterDequeue() {
std::lock_guard<std::mutex> lock(mutex_);
ASSERT(dequeue_.reactable_ != nullptr);
- dequeue_.handler_->reactor_->Unregister(dequeue_.reactable_);
+ dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
dequeue_.reactable_ = nullptr;
dequeue_.handler_ = nullptr;
}
Handler* dequeue_handler_;
};
-class QueueTestSingleThread : public ::testing::Test {
- protected:
- void SetUp() override {
- reactor_ = new Reactor();
- enqueue_handler_ = new Handler(reactor_);
- dequeue_handler_ = new Handler(reactor_);
- }
- void TearDown() override {
- delete enqueue_handler_;
- delete dequeue_handler_;
- delete reactor_;
- enqueue_handler_ = nullptr;
- dequeue_handler_ = nullptr;
- reactor_ = nullptr;
- }
- Reactor* reactor_;
- Handler* enqueue_handler_;
- Handler* dequeue_handler_;
-};
-
class TestEnqueueEnd {
public:
explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
: count(0), handler_(handler), queue_(queue), delay_(0) {}
+ ~TestEnqueueEnd() {}
+
void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
promise_map_ = promise_map;
handler_->Post([this] { queue_->RegisterEnqueue(handler_, [this] { return EnqueueCallbackForTest(); }); });
explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
: count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
+ ~TestDequeueEnd() {}
+
void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
promise_map_ = promise_map;
handler_->Post([this] { queue_->RegisterDequeue(handler_, [this] { DequeueCallbackForTest(); }); });
});
future.wait();
}
-
-TEST_F(QueueTestSingleThread, no_unregister_enqueue_death_test) {
- Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
-
- queue->RegisterEnqueue(enqueue_handler_,
- []() { return std::make_unique<std::string>("A string to fill the queue"); });
-
- EXPECT_DEATH(delete queue, "nqueue");
-}
-
-TEST_F(QueueTestSingleThread, no_unregister_dequeue_death_test) {
- Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
-
- queue->RegisterDequeue(dequeue_handler_, []() {});
-
- EXPECT_DEATH(delete queue, "equeue");
-}
} // namespace
} // namespace os
} // namespace bluetooth
namespace bluetooth {
namespace os {
-RepeatingAlarm::RepeatingAlarm(Reactor* reactor) : reactor_(reactor), fd_(timerfd_create(ALARM_CLOCK, 0)) {
+RepeatingAlarm::RepeatingAlarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) {
ASSERT(fd_ != -1);
- token_ = reactor_->Register(fd_, [this] { on_fire(); }, nullptr);
+ token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr);
}
-RepeatingAlarm::RepeatingAlarm(Thread* thread) : RepeatingAlarm(thread->GetReactor()) {}
-
RepeatingAlarm::~RepeatingAlarm() {
- reactor_->Unregister(token_);
+ thread_->GetReactor()->Unregister(token_);
int close_status;
RUN_NO_INTR(close_status = close(fd_));
// Create and register a repeating alarm on given thread
explicit RepeatingAlarm(Thread* thread);
- // Create and register a repeating alarm on the given reactor
- explicit RepeatingAlarm(Reactor* reactor);
-
// Unregister this alarm from the thread and release resource
~RepeatingAlarm();
private:
Closure task_;
- Reactor* reactor_;
+ Thread* thread_;
int fd_ = 0;
Reactor::Reactable* token_;
mutable std::mutex mutex_;