// Pass function is called by the last thread, the count will
// be decremented to zero and a Broadcast will be made on the
// condition variable, thus waking this up.
- if (count_ != 0) {
+ while (count_ != 0) {
condition_.Wait(self);
}
}
SetCountLocked(self, count_ + delta);
bool timed_out = false;
if (count_ != 0) {
- timed_out = condition_.TimedWait(self, timeout_ms, 0);
+ uint32_t timeout_ns = 0;
+ uint64_t abs_timeout = NanoTime() + MsToNs(timeout_ms);
+ for (;;) {
+ timed_out = condition_.TimedWait(self, timeout_ms, timeout_ns);
+ if (timed_out || count_ == 0) return timed_out;
+ // Compute time remaining on timeout.
+ uint64_t now = NanoTime();
+ int64_t time_left = abs_timeout - now;
+ if (time_left <= 0) return true;
+ timeout_ns = time_left % (1000*1000);
+ timeout_ms = time_left / (1000*1000);
+ }
}
return timed_out;
}
* limitations under the License.
*/
+// CAUTION: THIS IS NOT A FULLY GENERAL BARRIER API.
+
+// It may either be used as a "latch" or single-use barrier, or it may be reused under
+// very limited conditions, e.g. if only Pass(), but not Wait() is called. Unlike a standard
+// latch API, it is possible to initialize the latch to a count of zero, repeatedly call
+// Pass() or Wait(), and only then set the count using the Increment() method. Threads at
+// a Wait() are only awoken if the count reaches zero AFTER the decrement is applied.
+// This works because, also unlike most latch APIs, there is no way to Wait() without
+// decrementing the count, and thus nobody can spuriosly wake up on the initial zero.
+
#ifndef ART_RUNTIME_BARRIER_H_
#define ART_RUNTIME_BARRIER_H_
namespace art {
+// TODO: Maybe give this a better name.
class Barrier {
public:
explicit Barrier(int count);
virtual ~Barrier();
- // Pass through the barrier, decrements the count but does not block.
+ // Pass through the barrier, decrement the count but do not block.
void Pass(Thread* self);
// Wait on the barrier, decrement the count.
void Wait(Thread* self);
- // Set the count to a new value, if the value is 0 then everyone waiting on the condition
- // variable is resumed.
- void Init(Thread* self, int count);
+ // The following three calls are only safe if we somehow know that no other thread both
+ // - has been woken up, and
+ // - has not left the Wait() or Increment() call.
+ // If these calls are made in that situation, the offending thread is likely to go back
+ // to sleep, resulting in a deadlock.
// Increment the count by delta, wait on condition if count is non zero.
void Increment(Thread* self, int delta) LOCKS_EXCLUDED(lock_);
// true if time out occurred.
bool Increment(Thread* self, int delta, uint32_t timeout_ms) LOCKS_EXCLUDED(lock_);
+ // Set the count to a new value. This should only be used if there is no possibility that
+ // another thread is still in Wait(). See above.
+ void Init(Thread* self, int count);
+
private:
void SetCountLocked(Thread* self, int count) EXCLUSIVE_LOCKS_REQUIRED(lock_);
namespace art {
class CheckWaitTask : public Task {
public:
- CheckWaitTask(Barrier* barrier, AtomicInteger* count1, AtomicInteger* count2,
- AtomicInteger* count3)
+ CheckWaitTask(Barrier* barrier, AtomicInteger* count1, AtomicInteger* count2)
: barrier_(barrier),
count1_(count1),
- count2_(count2),
- count3_(count3) {}
+ count2_(count2) {}
void Run(Thread* self) {
- LOG(INFO) << "Before barrier 1 " << *self;
+ LOG(INFO) << "Before barrier" << *self;
++*count1_;
barrier_->Wait(self);
++*count2_;
- LOG(INFO) << "Before barrier 2 " << *self;
- barrier_->Wait(self);
- ++*count3_;
- LOG(INFO) << "After barrier 2 " << *self;
+ LOG(INFO) << "After barrier" << *self;
}
virtual void Finalize() {
Barrier* const barrier_;
AtomicInteger* const count1_;
AtomicInteger* const count2_;
- AtomicInteger* const count3_;
};
class BarrierTest : public CommonRuntimeTest {
TEST_F(BarrierTest, CheckWait) {
Thread* self = Thread::Current();
ThreadPool thread_pool("Barrier test thread pool", num_threads);
- Barrier barrier(0);
+ Barrier barrier(num_threads + 1); // One extra Wait() in main thread.
+ Barrier timeout_barrier(0); // Only used for sleeping on timeout.
AtomicInteger count1(0);
AtomicInteger count2(0);
- AtomicInteger count3(0);
for (int32_t i = 0; i < num_threads; ++i) {
- thread_pool.AddTask(self, new CheckWaitTask(&barrier, &count1, &count2, &count3));
+ thread_pool.AddTask(self, new CheckWaitTask(&barrier, &count1, &count2));
}
thread_pool.StartWorkers(self);
- barrier.Increment(self, num_threads);
- // At this point each thread should have passed through the barrier. The first count should be
- // equal to num_threads.
- EXPECT_EQ(num_threads, count1.LoadRelaxed());
- // Count 3 should still be zero since no thread should have gone past the second barrier.
- EXPECT_EQ(0, count3.LoadRelaxed());
- // Now lets tell the threads to pass again.
- barrier.Increment(self, num_threads);
- // Count 2 should be equal to num_threads since each thread must have passed the second barrier
- // at this point.
- EXPECT_EQ(num_threads, count2.LoadRelaxed());
+ while (count1.LoadRelaxed() != num_threads) {
+ timeout_barrier.Increment(self, 1, 100); // sleep 100 msecs
+ }
+ // Count 2 should still be zero since no thread should have gone past the barrier.
+ EXPECT_EQ(0, count2.LoadRelaxed());
+ // Perform one additional Wait(), allowing pool threads to proceed.
+ barrier.Wait(self);
// Wait for all the threads to finish.
thread_pool.Wait(self, true, false);
- // All three counts should be equal to num_threads now.
- EXPECT_EQ(count1.LoadRelaxed(), count2.LoadRelaxed());
- EXPECT_EQ(count2.LoadRelaxed(), count3.LoadRelaxed());
- EXPECT_EQ(num_threads, count3.LoadRelaxed());
+ // Both counts should be equal to num_threads now.
+ EXPECT_EQ(count1.LoadRelaxed(), num_threads);
+ EXPECT_EQ(count2.LoadRelaxed(), num_threads);
+ timeout_barrier.Init(self, 0); // Reset to zero for destruction.
}
class CheckPassTask : public Task {
// Sleep for the given number of nanoseconds, a bad way to handle contention.
void NanoSleep(uint64_t ns);
-// Initialize a timespec to either an absolute or relative time.
+// Initialize a timespec to either a relative time (ms,ns), or to the absolute
+// time corresponding to the indicated clock value plus the supplied offset.
void InitTimeSpec(bool absolute, int clock, int64_t ms, int32_t ns, timespec* ts);
// Splits a string using the given separator character into a vector of