2 * Copyright (C) 2010 The Android Open Source Project
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 //#define WORKER_DEBUG
26 #define DBG(...) LOGD(__VA_ARGS__)
34 void * WorkerThread::Work(void *param) {
35 WorkerThread *t = (WorkerThread *)param;
36 android_atomic_acquire_store(STATE_RUNNING, &t->state_);
37 void * v = t->Worker(t->workerParam_);
38 android_atomic_acquire_store(STATE_STOPPED, &t->state_);
42 bool WorkerThread::isRunning() {
43 DBG("WorkerThread::isRunning E");
44 bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
45 DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
49 WorkerThread::WorkerThread() {
50 DBG("WorkerThread::WorkerThread E");
51 state_ = STATE_INITIALIZED;
52 pthread_mutex_init(&mutex_, NULL);
53 pthread_cond_init(&cond_, NULL);
54 DBG("WorkerThread::WorkerThread X");
57 WorkerThread::~WorkerThread() {
58 DBG("WorkerThread::~WorkerThread E");
60 pthread_mutex_destroy(&mutex_);
61 DBG("WorkerThread::~WorkerThread X");
64 // Return true if changed from STATE_RUNNING to STATE_STOPPING
65 bool WorkerThread::BeginStopping() {
66 DBG("WorkerThread::BeginStopping E");
67 bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
68 DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
72 // Wait until state is not STATE_STOPPING
73 void WorkerThread::WaitUntilStopped() {
74 DBG("WorkerThread::WaitUntilStopped E");
75 pthread_cond_signal(&cond_);
76 while(android_atomic_release_load(&state_) == STATE_STOPPING) {
79 DBG("WorkerThread::WaitUntilStopped X");
82 void WorkerThread::Stop() {
83 DBG("WorkerThread::Stop E");
84 if (BeginStopping()) {
87 DBG("WorkerThread::Stop X");
90 int WorkerThread::Run(void *workerParam) {
91 DBG("WorkerThread::Run E workerParam=%p", workerParam);
95 workerParam_ = workerParam;
97 ret = pthread_attr_init(&attr_);
99 LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
102 ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
104 LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
108 ret = pthread_create(&tid_, &attr_,
109 (void * (*)(void *))&WorkerThread::Work, this);
111 LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
115 // Wait until worker is running
116 while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
120 DBG("WorkerThread::Run X workerParam=%p", workerParam);
125 class WorkerQueueThread : public WorkerThread {
127 friend class WorkerQueue;
130 WorkerQueueThread() {
133 virtual ~WorkerQueueThread() {
137 void * Worker(void *param) {
138 DBG("WorkerQueueThread::Worker E");
139 WorkerQueue *wq = (WorkerQueue *)param;
141 // Do the work until we're told to stop
142 while (isRunning()) {
143 pthread_mutex_lock(&mutex_);
144 while (isRunning() && wq->q_.size() == 0) {
145 if (wq->delayed_q_.size() == 0) {
146 // Both queue's are empty so wait
147 pthread_cond_wait(&cond_, &mutex_);
149 // delayed_q_ is not empty, move any
150 // timed out records to q_.
151 int64_t now = android::elapsedRealtime();
152 while((wq->delayed_q_.size() != 0) &&
153 ((wq->delayed_q_.top()->time - now) <= 0)) {
154 struct WorkerQueue::Record *r = wq->delayed_q_.top();
155 DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
157 wq->delayed_q_.pop();
161 if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
162 // We need to do a timed wait
165 struct WorkerQueue::Record *r = wq->delayed_q_.top();
166 int64_t delay_ms = r->time - now;
167 DBG("WorkerQueueThread::Worker wait"
168 " p=%p time=%lldms delay_ms=%lldms",
169 r->p, r->time, delay_ms);
170 gettimeofday(&tv, NULL);
171 ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
172 ts.tv_nsec = (tv.tv_usec +
173 ((delay_ms % 1000) * 1000)) * 1000;
174 pthread_cond_timedwait(&cond_, &mutex_, &ts);
179 struct WorkerQueue::Record *r = wq->q_.front();
182 wq->release_record(r);
183 pthread_mutex_unlock(&mutex_);
186 pthread_mutex_unlock(&mutex_);
189 DBG("WorkerQueueThread::Worker X");
194 WorkerQueue::WorkerQueue() {
195 DBG("WorkerQueue::WorkerQueue E");
196 wqt_ = new WorkerQueueThread();
197 DBG("WorkerQueue::WorkerQueue X");
200 WorkerQueue::~WorkerQueue() {
201 DBG("WorkerQueue::~WorkerQueue E");
205 pthread_mutex_lock(&wqt_->mutex_);
206 while(free_list_.size() != 0) {
207 r = free_list_.front();
208 free_list_.pop_front();
209 DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
212 while(delayed_q_.size() != 0) {
213 r = delayed_q_.top();
215 DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
218 pthread_mutex_unlock(&wqt_->mutex_);
221 DBG("WorkerQueue::~WorkerQueue X");
224 int WorkerQueue::Run() {
225 return wqt_->Run(this);
228 void WorkerQueue::Stop() {
233 * Obtain a record from free_list if it is not empty, fill in the record with provided
234 * information: *p and delay_in_ms
236 struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
238 if (free_list_.size() == 0) {
240 DBG("WorkerQueue::obtain_record new r=%p", r);
242 r = free_list_.front();
243 DBG("WorkerQueue::obtain_record reuse r=%p", r);
244 free_list_.pop_front();
247 if (delay_in_ms != 0) {
248 r->time = android::elapsedRealtime() + delay_in_ms;
256 * release a record and insert into the front of the free_list
258 void WorkerQueue::release_record(struct Record *r) {
259 DBG("WorkerQueue::release_record r=%p", r);
260 free_list_.push_front(r);
264 * Add a record to processing queue q_
266 void WorkerQueue::Add(void *p) {
267 DBG("WorkerQueue::Add E:");
268 pthread_mutex_lock(&wqt_->mutex_);
269 struct Record *r = obtain_record(p, 0);
271 if (q_.size() == 1) {
272 pthread_cond_signal(&wqt_->cond_);
274 pthread_mutex_unlock(&wqt_->mutex_);
275 DBG("WorkerQueue::Add X:");
278 void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
279 DBG("WorkerQueue::AddDelayed E:");
280 if (delay_in_ms <= 0) {
283 pthread_mutex_lock(&wqt_->mutex_);
284 struct Record *r = obtain_record(p, delay_in_ms);
287 int64_t now = android::elapsedRealtime();
288 DBG("WorkerQueue::AddDelayed"
289 " p=%p delay_in_ms=%d now=%lldms top->p=%p"
290 " top->time=%lldms diff=%lldms",
291 p, delay_in_ms, now, delayed_q_.top()->p,
292 delayed_q_.top()->time, delayed_q_.top()->time - now);
294 if ((q_.size() == 0) && (delayed_q_.top() == r)) {
295 // q_ is empty and the new record is at delayed_q_.top
296 // so we signal the waiting thread so it can readjust
298 DBG("WorkerQueue::AddDelayed signal");
299 pthread_cond_signal(&wqt_->cond_);
301 pthread_mutex_unlock(&wqt_->mutex_);
303 DBG("WorkerQueue::AddDelayed X:");
307 class TestWorkerQueue : public WorkerQueue {
308 virtual void Process(void *p) {
309 LOGD("TestWorkerQueue::Process: EX p=%p", p);
313 class TesterThread : public WorkerThread {
315 void * Worker(void *param)
317 LOGD("TesterThread::Worker E param=%p", param);
318 WorkerQueue *wq = (WorkerQueue *)param;
321 wq->AddDelayed((void *)1000, 1000);
326 wq->AddDelayed((void *)100, 100);
327 wq->AddDelayed((void *)2000, 2000);
329 for (int i = 1; isRunning(); i++) {
330 LOGD("TesterThread: looping %d", i);
338 LOGD("TesterThread::Worker X param=%p", param);
345 LOGD("testWorker E: ********");
347 // Test we can create a thread and delete it
348 TesterThread *tester = new TesterThread();
351 TestWorkerQueue *wq = new TestWorkerQueue();
352 if (wq->Run() == STATUS_OK) {
353 LOGD("testWorker WorkerQueue %p running", wq);
355 // Test we can run a thread, stop it then delete it
356 tester = new TesterThread();
358 LOGD("testWorker tester %p running", tester);
360 LOGD("testWorker tester %p stopping", tester);
362 LOGD("testWorker tester %p stopped", tester);
364 LOGD("testWorker wq %p stopped", wq);
366 LOGD("testWorker X: ********\n");