Makes it easy to schedule a bunch of work to happen in parallel.
Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c
// Get pid for the current thread.
extern pid_t androidGetTid();
+#ifdef HAVE_ANDROID_OS
// Change the scheduling group of a particular thread. The group
// should be one of the ANDROID_TGROUP constants. Returns BAD_VALUE if
// grp is out of range, else another non-zero value with errno set if
// scheduling groups are disabled. Returns INVALID_OPERATION if unexpected error.
// Thread ID zero means current thread.
extern int androidGetThreadSchedulingGroup(pid_t tid);
+#endif
#ifdef __cplusplus
} // extern "C"
--- /dev/null
+/*]
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _LIBS_UTILS_WORK_QUEUE_H
+#define _LIBS_UTILS_WORK_QUEUE_H
+
+#include <utils/Errors.h>
+#include <utils/Vector.h>
+#include <utils/threads.h>
+
+namespace android {
+
+/*
+ * A threaded work queue.
+ *
+ * This class is designed to make it easy to run a bunch of isolated work
+ * units in parallel, using up to the specified number of threads.
+ * To use it, write a loop to post work units to the work queue, then synchronize
+ * on the queue at the end.
+ */
+class WorkQueue {
+public:
+ class WorkUnit {
+ public:
+ WorkUnit() { }
+ virtual ~WorkUnit() { }
+
+ /*
+ * Runs the work unit.
+ * If the result is 'true' then the work queue continues scheduling work as usual.
+ * If the result is 'false' then the work queue is canceled.
+ */
+ virtual bool run() = 0;
+ };
+
+ /* Creates a work queue with the specified maximum number of work threads. */
+ WorkQueue(size_t maxThreads, bool canCallJava = true);
+
+ /* Destroys the work queue.
+ * Cancels pending work and waits for all remaining threads to complete.
+ */
+ ~WorkQueue();
+
+ /* Posts a work unit to run later.
+ * If the work queue has been canceled or is already finished, returns INVALID_OPERATION
+ * and does not take ownership of the work unit (caller must destroy it itself).
+ * Otherwise, returns OK and takes ownership of the work unit (the work queue will
+ * destroy it automatically).
+ *
+ * For flow control, this method blocks when the size of the pending work queue is more
+ * 'backlog' times the number of threads. This condition reduces the rate of entry into
+ * the pending work queue and prevents it from growing much more rapidly than the
+ * work threads can actually handle.
+ *
+ * If 'backlog' is 0, then no throttle is applied.
+ */
+ status_t schedule(WorkUnit* workUnit, size_t backlog = 2);
+
+ /* Cancels all pending work.
+ * If the work queue is already finished, returns INVALID_OPERATION.
+ * If the work queue is already canceled, returns OK and does nothing else.
+ * Otherwise, returns OK, discards all pending work units and prevents additional
+ * work units from being scheduled.
+ *
+ * Call finish() after cancel() to wait for all remaining work to complete.
+ */
+ status_t cancel();
+
+ /* Waits for all work to complete.
+ * If the work queue is already finished, returns INVALID_OPERATION.
+ * Otherwise, waits for all work to complete and returns OK.
+ */
+ status_t finish();
+
+private:
+ class WorkThread : public Thread {
+ public:
+ WorkThread(WorkQueue* workQueue, bool canCallJava);
+ virtual ~WorkThread();
+
+ private:
+ virtual bool threadLoop();
+
+ WorkQueue* const mWorkQueue;
+ };
+
+ status_t cancelLocked();
+ bool threadLoop(); // called from each work thread
+
+ const size_t mMaxThreads;
+ const bool mCanCallJava;
+
+ Mutex mLock;
+ Condition mWorkChangedCondition;
+ Condition mWorkDequeuedCondition;
+
+ bool mCanceled;
+ bool mFinished;
+ size_t mIdleThreads;
+ Vector<sp<WorkThread> > mWorkThreads;
+ Vector<WorkUnit*> mWorkUnits;
+};
+
+}; // namespace android
+
+#endif // _LIBS_UTILS_WORK_QUEUE_H
Tokenizer.cpp \
Unicode.cpp \
VectorImpl.cpp \
+ WorkQueue.cpp \
misc.cpp
host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS)
#endif
}
+#ifdef HAVE_ANDROID_OS
int androidSetThreadSchedulingGroup(pid_t tid, int grp)
{
if (grp > ANDROID_TGROUP_MAX || grp < 0) {
return ret;
}
+#endif
namespace android {
--- /dev/null
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "WorkQueue"
+
+#include <utils/Log.h>
+#include <utils/WorkQueue.h>
+
+namespace android {
+
+// --- WorkQueue ---
+
+WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
+ mMaxThreads(maxThreads), mCanCallJava(canCallJava),
+ mCanceled(false), mFinished(false), mIdleThreads(0) {
+}
+
+WorkQueue::~WorkQueue() {
+ if (!cancel()) {
+ finish();
+ }
+}
+
+status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
+ AutoMutex _l(mLock);
+
+ if (mFinished || mCanceled) {
+ return INVALID_OPERATION;
+ }
+
+ if (mWorkThreads.size() < mMaxThreads
+ && mIdleThreads < mWorkUnits.size() + 1) {
+ sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
+ status_t status = workThread->run("WorkQueue::WorkThread");
+ if (status) {
+ return status;
+ }
+ mWorkThreads.add(workThread);
+ mIdleThreads += 1;
+ } else if (backlog) {
+ while (mWorkUnits.size() >= mMaxThreads * backlog) {
+ mWorkDequeuedCondition.wait(mLock);
+ if (mFinished || mCanceled) {
+ return INVALID_OPERATION;
+ }
+ }
+ }
+
+ mWorkUnits.add(workUnit);
+ mWorkChangedCondition.broadcast();
+ return OK;
+}
+
+status_t WorkQueue::cancel() {
+ AutoMutex _l(mLock);
+
+ return cancelLocked();
+}
+
+status_t WorkQueue::cancelLocked() {
+ if (mFinished) {
+ return INVALID_OPERATION;
+ }
+
+ if (!mCanceled) {
+ mCanceled = true;
+
+ size_t count = mWorkUnits.size();
+ for (size_t i = 0; i < count; i++) {
+ delete mWorkUnits.itemAt(i);
+ }
+ mWorkUnits.clear();
+ mWorkChangedCondition.broadcast();
+ mWorkDequeuedCondition.broadcast();
+ }
+ return OK;
+}
+
+status_t WorkQueue::finish() {
+ { // acquire lock
+ AutoMutex _l(mLock);
+
+ if (mFinished) {
+ return INVALID_OPERATION;
+ }
+
+ mFinished = true;
+ mWorkChangedCondition.broadcast();
+ } // release lock
+
+ // It is not possible for the list of work threads to change once the mFinished
+ // flag has been set, so we can access mWorkThreads outside of the lock here.
+ size_t count = mWorkThreads.size();
+ for (size_t i = 0; i < count; i++) {
+ mWorkThreads.itemAt(i)->join();
+ }
+ mWorkThreads.clear();
+ return OK;
+}
+
+bool WorkQueue::threadLoop() {
+ WorkUnit* workUnit;
+ { // acquire lock
+ AutoMutex _l(mLock);
+
+ for (;;) {
+ if (mCanceled) {
+ return false;
+ }
+
+ if (!mWorkUnits.isEmpty()) {
+ workUnit = mWorkUnits.itemAt(0);
+ mWorkUnits.removeAt(0);
+ mIdleThreads -= 1;
+ mWorkDequeuedCondition.broadcast();
+ break;
+ }
+
+ if (mFinished) {
+ return false;
+ }
+
+ mWorkChangedCondition.wait(mLock);
+ }
+ } // release lock
+
+ bool shouldContinue = workUnit->run();
+ delete workUnit;
+
+ { // acquire lock
+ AutoMutex _l(mLock);
+
+ mIdleThreads += 1;
+
+ if (!shouldContinue) {
+ cancelLocked();
+ return false;
+ }
+ } // release lock
+
+ return true;
+}
+
+// --- WorkQueue::WorkThread ---
+
+WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
+ Thread(canCallJava), mWorkQueue(workQueue) {
+}
+
+WorkQueue::WorkThread::~WorkThread() {
+}
+
+bool WorkQueue::WorkThread::threadLoop() {
+ return mWorkQueue->threadLoop();
+}
+
+}; // namespace android