OSDN Git Service

Merge "Run code coverage only on host target"
[android-x86/system-bt.git] / vendor_libs / test_vendor_lib / model / setup / async_manager.cc
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #define LOG_TAG "async_manager"
18
19 #include "async_manager.h"
20
21 #include "osi/include/log.h"
22
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 #include "fcntl.h"
30 #include "sys/select.h"
31 #include "unistd.h"
32
33 namespace test_vendor_lib {
34 // Implementation of AsyncManager is divided between two classes, three if
35 // AsyncManager itself is taken into account, but its only responsability
36 // besides being a proxy for the other two classes is to provide a global
37 // synchronization mechanism for callbacks and client code to use.
38
39 // The watching of file descriptors is done through AsyncFdWatcher. Several
40 // objects of this class may coexist simultaneosly as they share no state.
41 // After construction of this objects nothing happens beyond some very simple
42 // member initialization. When the first FD is set up for watching the object
43 // starts a new thread which watches the given (and later provided) FDs using
44 // select() inside a loop. A special FD (a pipe) is also watched which is
45 // used to notify the thread of internal changes on the object state (like
46 // the addition of new FDs to watch on). Every access to internal state is
47 // synchronized using a single internal mutex. The thread is only stopped on
48 // destruction of the object, by modifying a flag, which is the only member
49 // variable accessed without acquiring the lock (because the notification to
50 // the thread is done later by writing to a pipe which means the thread will
51 // be notified regardless of what phase of the loop it is in that moment)
52
53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
55 // state between different instances so it is safe to use several objects of
56 // this class, also nothing interesting happens upon construction, but only
57 // after a Task has been scheduled and access to internal state is synchronized
58 // using a single internal mutex. When the first task is scheduled a thread
59 // is started which monitors a queue of tasks. The queue is peeked to see
60 // when the next task should be carried out and then the thread performs a
61 // (absolute) timed wait on a condition variable. The wait ends because of a
62 // time out or a notify on the cond var, the former means a task is due
63 // for execution while the later means there has been a change in internal
64 // state, like a task has been scheduled/canceled or the flag to stop has
65 // been set. Setting and querying the stop flag or modifying the task queue
66 // and subsequent notification on the cond var is done atomically (e.g while
67 // holding the lock on the internal mutex) to ensure that the thread never
68 // misses the notification, since notifying a cond var is not persistent as
69 // writing on a pipe (if not done this way, the thread could query the
70 // stopping flag and be put aside by the OS scheduler right after, then the
71 // 'stop thread' procedure could run, setting the flag, notifying a cond
72 // var that no one is waiting on and joining the thread, the thread then
73 // resumes execution believing that it needs to continue and waits on the
74 // cond var possibly forever if there are no tasks scheduled, efectively
75 // causing a deadlock).
76
77 // This number also states the maximum number of scheduled tasks we can handle
78 // at a given time
79 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
80 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
81   return (id == kMaxTaskId) ? 1 : id + 1;
82 }
83 // The buffer is only 10 bytes because the expected number of bytes
84 // written on this socket is 1. It is possible that the thread is notified
85 // more than once but highly unlikely, so a buffer of size 10 seems enough
86 // and the reads are performed inside a while just in case it isn't. From
87 // the thread routine's point of view it is the same to have been notified
88 // just once or 100 times so it just tries to consume the entire buffer.
89 // In the cases where an interrupt would cause read to return without
90 // having read everything that was available a new iteration of the thread
91 // loop will bring execution to this point almost immediately, so there is
92 // no need to treat that case.
93 static const int kNotificationBufferSize = 10;
94
95 // Async File Descriptor Watcher Implementation:
96 class AsyncManager::AsyncFdWatcher {
97  public:
98   int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99     // add file descriptor and callback
100     {
101       std::unique_lock<std::mutex> guard(internal_mutex_);
102       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103     }
104
105     // start the thread if not started yet
106     int started = tryStartThread();
107     if (started != 0) {
108       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
109       return started;
110     }
111
112     // notify the thread so that it knows of the new FD
113     notifyThread();
114
115     return 0;
116   }
117
118   void StopWatchingFileDescriptor(int file_descriptor) {
119     std::unique_lock<std::mutex> guard(internal_mutex_);
120     watched_shared_fds_.erase(file_descriptor);
121   }
122
123   AsyncFdWatcher() = default;
124
125   ~AsyncFdWatcher() = default;
126
127   int stopThread() {
128     if (!std::atomic_exchange(&running_, false)) {
129       return 0;  // if not running already
130     }
131
132     notifyThread();
133
134     if (std::this_thread::get_id() != thread_.get_id()) {
135       thread_.join();
136     } else {
137       LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __func__);
138     }
139
140     {
141       std::unique_lock<std::mutex> guard(internal_mutex_);
142       watched_shared_fds_.clear();
143     }
144
145     return 0;
146   }
147
148  private:
149   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
150   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
151
152   // Make sure to call this with at least one file descriptor ready to be
153   // watched upon or the thread routine will return immediately
154   int tryStartThread() {
155     if (std::atomic_exchange(&running_, true)) {
156       return 0;  // if already running
157     }
158     // set up the communication channel
159     int pipe_fds[2];
160     if (pipe2(pipe_fds, O_NONBLOCK)) {
161       LOG_ERROR(LOG_TAG,
162                 "%s:Unable to establish a communication channel to the reading "
163                 "thread",
164                 __func__);
165       return -1;
166     }
167     notification_listen_fd_ = pipe_fds[0];
168     notification_write_fd_ = pipe_fds[1];
169
170     thread_ = std::thread([this]() { ThreadRoutine(); });
171     if (!thread_.joinable()) {
172       LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
173       return -1;
174     }
175     return 0;
176   }
177
178   int notifyThread() {
179     char buffer = '0';
180     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
181       LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread", __func__);
182       return -1;
183     }
184     return 0;
185   }
186
187   int setUpFileDescriptorSet(fd_set& read_fds) {
188     // add comm channel to the set
189     FD_SET(notification_listen_fd_, &read_fds);
190     int nfds = notification_listen_fd_;
191
192     // add watched FDs to the set
193     {
194       std::unique_lock<std::mutex> guard(internal_mutex_);
195       for (auto& fdp : watched_shared_fds_) {
196         FD_SET(fdp.first, &read_fds);
197         nfds = std::max(fdp.first, nfds);
198       }
199     }
200     return nfds;
201   }
202
203   // check the comm channel and read everything there
204   bool consumeThreadNotifications(fd_set& read_fds) {
205     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
206       char buffer[kNotificationBufferSize];
207       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
208              kNotificationBufferSize) {
209       }
210       return true;
211     }
212     return false;
213   }
214
215   // check all file descriptors and call callbacks if necesary
216   void runAppropriateCallbacks(fd_set& read_fds) {
217     // not a good idea to call a callback while holding the FD lock,
218     // nor to release the lock while traversing the map
219     std::vector<decltype(watched_shared_fds_)::value_type> fds;
220     {
221       std::unique_lock<std::mutex> guard(internal_mutex_);
222       for (auto& fdc : watched_shared_fds_) {
223         if (FD_ISSET(fdc.first, &read_fds)) {
224           fds.push_back(fdc);
225         }
226       }
227     }
228     for (auto& p : fds) {
229       p.second(p.first);
230     }
231   }
232
233   void ThreadRoutine() {
234     while (running_) {
235       fd_set read_fds;
236       FD_ZERO(&read_fds);
237       int nfds = setUpFileDescriptorSet(read_fds);
238
239       // wait until there is data available to read on some FD
240       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
241       if (retval <= 0) {  // there was some error or a timeout
242         LOG_ERROR(LOG_TAG,
243                   "%s: There was an error while waiting for data on the file "
244                   "descriptors: %s",
245                   __func__, strerror(errno));
246         continue;
247       }
248
249       consumeThreadNotifications(read_fds);
250
251       // Do not read if there was a call to stop running
252       if (!running_) {
253         break;
254       }
255
256       runAppropriateCallbacks(read_fds);
257     }
258   }
259
260   std::atomic_bool running_{false};
261   std::thread thread_;
262   std::mutex internal_mutex_;
263
264   std::map<int, ReadCallback> watched_shared_fds_;
265
266   // A pair of FD to send information to the reading thread
267   int notification_listen_fd_;
268   int notification_write_fd_;
269 };
270
271 // Async task manager implementation
272 class AsyncManager::AsyncTaskManager {
273  public:
274   AsyncTaskId ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
275     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, callback));
276   }
277
278   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
279                                     const TaskCallback& callback) {
280     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, period, callback));
281   }
282
283   bool CancelAsyncTask(AsyncTaskId async_task_id) {
284     // remove task from queue (and task id asociation) while holding lock
285     std::unique_lock<std::mutex> guard(internal_mutex_);
286     if (tasks_by_id.count(async_task_id) == 0) {
287       return false;
288     }
289     task_queue_.erase(tasks_by_id[async_task_id]);
290     tasks_by_id.erase(async_task_id);
291     return true;
292   }
293
294   AsyncTaskManager() = default;
295
296   ~AsyncTaskManager() = default;
297
298   int stopThread() {
299     {
300       std::unique_lock<std::mutex> guard(internal_mutex_);
301       tasks_by_id.clear();
302       task_queue_.clear();
303       if (!running_) {
304         return 0;
305       }
306       running_ = false;
307       // notify the thread
308       internal_cond_var_.notify_one();
309     }  // release the lock before joining a thread that is likely waiting for it
310     if (std::this_thread::get_id() != thread_.get_id()) {
311       thread_.join();
312     } else {
313       LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __func__);
314     }
315     return 0;
316   }
317
318  private:
319   // Holds the data for each task
320   class Task {
321    public:
322     Task(std::chrono::steady_clock::time_point time, std::chrono::milliseconds period, const TaskCallback& callback)
323         : time(time), periodic(true), period(period), callback(callback), task_id(kInvalidTaskId) {}
324     Task(std::chrono::steady_clock::time_point time, const TaskCallback& callback)
325         : time(time), periodic(false), callback(callback), task_id(kInvalidTaskId) {}
326
327     // Operators needed to be in a collection
328     bool operator<(const Task& another) const {
329       return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
330     }
331
332     bool isPeriodic() const {
333       return periodic;
334     }
335
336     // These fields should no longer be public if the class ever becomes
337     // public or gets more complex
338     std::chrono::steady_clock::time_point time;
339     bool periodic;
340     std::chrono::milliseconds period;
341     TaskCallback callback;
342     AsyncTaskId task_id;
343   };
344
345   // A comparator class to put shared pointers to tasks in an ordered set
346   struct task_p_comparator {
347     bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
348       return *t1 < *t2;
349     }
350   };
351
352   AsyncTaskManager(const AsyncTaskManager&) = delete;
353   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
354
355   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
356     AsyncTaskId task_id = kInvalidTaskId;
357     {
358       std::unique_lock<std::mutex> guard(internal_mutex_);
359       // no more room for new tasks, we need a larger type for IDs
360       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
361         return kInvalidTaskId;
362       do {
363         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
364       } while (isTaskIdInUse(lastTaskId_));
365       task->task_id = lastTaskId_;
366       // add task to the queue and map
367       tasks_by_id[lastTaskId_] = task;
368       task_queue_.insert(task);
369       task_id = lastTaskId_;
370     }
371     // start thread if necessary
372     int started = tryStartThread();
373     if (started != 0) {
374       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
375       return kInvalidTaskId;
376     }
377     // notify the thread so that it knows of the new task
378     internal_cond_var_.notify_one();
379     // return task id
380     return task_id;
381   }
382
383   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
384     return tasks_by_id.count(task_id) != 0;
385   }
386
387   int tryStartThread() {
388     // need the lock because of the running flag and the cond var
389     std::unique_lock<std::mutex> guard(internal_mutex_);
390     // check that the thread is not yet running
391     if (running_) {
392       return 0;
393     }
394     // start the thread
395     running_ = true;
396     thread_ = std::thread([this]() { ThreadRoutine(); });
397     if (!thread_.joinable()) {
398       LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
399       return -1;
400     }
401     return 0;
402   }
403
404   void ThreadRoutine() {
405     while (1) {
406       TaskCallback callback;
407       bool run_it = false;
408       {
409         std::unique_lock<std::mutex> guard(internal_mutex_);
410         if (!task_queue_.empty()) {
411           std::shared_ptr<Task> task_p = *(task_queue_.begin());
412           if (task_p->time < std::chrono::steady_clock::now()) {
413             run_it = true;
414             callback = task_p->callback;
415             task_queue_.erase(task_p);  // need to remove and add again if
416                                         // periodic to update order
417             if (task_p->isPeriodic()) {
418               task_p->time += task_p->period;
419               task_queue_.insert(task_p);
420             } else {
421               tasks_by_id.erase(task_p->task_id);
422             }
423           }
424         }
425       }
426       if (run_it) {
427         callback();
428       }
429       {
430         std::unique_lock<std::mutex> guard(internal_mutex_);
431         // wait on condition variable with timeout just in time for next task if
432         // any
433         if (task_queue_.size() > 0) {
434           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
435         } else {
436           internal_cond_var_.wait(guard);
437         }
438         // check for termination right after being notified (and maybe before?)
439         if (!running_) break;
440       }
441     }
442   }
443
444   bool running_ = false;
445   std::thread thread_;
446   std::mutex internal_mutex_;
447   std::condition_variable internal_cond_var_;
448
449   AsyncTaskId lastTaskId_ = kInvalidTaskId;
450   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
451   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
452 };
453
454 // Async Manager Implementation:
455 AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
456
457 AsyncManager::~AsyncManager() {
458   // Make sure the threads are stopped before destroying the object.
459   // The threads need to be stopped here and not in each internal class'
460   // destructor because unique_ptr's reset() first assigns nullptr to the
461   // pointer and only then calls the destructor, so any callback running
462   // on these threads would dereference a null pointer if they called a member
463   // function of this class.
464   fdWatcher_p_->stopThread();
465   taskManager_p_->stopThread();
466 }
467
468 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
469   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
470 }
471
472 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
473   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
474 }
475
476 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
477   return taskManager_p_->ExecAsync(delay, callback);
478 }
479
480 AsyncTaskId AsyncManager::ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
481                                                 const TaskCallback& callback) {
482   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
483 }
484
485 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
486   return taskManager_p_->CancelAsyncTask(async_task_id);
487 }
488
489 void AsyncManager::Synchronize(const CriticalCallback& critical) {
490   std::unique_lock<std::mutex> guard(synchronization_mutex_);
491   critical();
492 }
493 }  // namespace test_vendor_lib