package com.android.camera.async;
-import java.util.ArrayList;
+import com.android.camera.util.Callback;
+
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
-import com.android.camera.util.Callback;
-
+import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
/**
* Generic asynchronous state wrapper which supports two methods of interaction:
* polling for the latest value and listening for updates.
- * <p>
- * Note that this class only supports polling and using listeners. If
- * synchronous consumption of state changes is required, see
- * {@link FutureResult} or {@link BufferQueue} and its implementations.
- * </p>
*/
+@ParametersAreNonnullByDefault
public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
private static class ExecutorListenerPair<T> {
*/
public void run(final T t) {
mExecutor.execute(new Runnable() {
+ @Override
public void run() {
mListener.onCallback(t);
}
}
private final Object mLock;
- private final Set<ExecutorListenerPair<T>> mListeners;
+ private final Set<ExecutorListenerPair<? super T>> mListeners;
private T mValue;
public ConcurrentState(T initialValue) {
* Updates the state to the latest value, notifying all listeners.
*/
@Override
- public void update(@Nonnull T newValue) {
- List<ExecutorListenerPair<T>> listeners = new ArrayList<>();
+ public void update(T newValue) {
synchronized (mLock) {
mValue = newValue;
- // Copy listeners out here so we can iterate over the list outside
- // the critical section.
- listeners.addAll(mListeners);
- }
- for (ExecutorListenerPair<T> pair : listeners) {
- pair.run(newValue);
+ // Invoke executors.execute within mLock to guarantee that
+ // callbacks are serialized into their respective executor in
+ // the proper order.
+ for (ExecutorListenerPair<? super T> pair : mListeners) {
+ pair.run(newValue);
+ }
}
}
+ @CheckReturnValue
+ @Nonnull
@Override
- public SafeCloseable addCallback(Callback callback, Executor executor) {
+ public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
synchronized (mLock) {
- final ExecutorListenerPair<T> pair = new ExecutorListenerPair<>(executor, callback);
+ final ExecutorListenerPair<? super T> pair =
+ new ExecutorListenerPair<>(executor, callback);
mListeners.add(pair);
return new SafeCloseable() {
*
* @return The latest state.
*/
+ @Nonnull
@Override
public T get() {
synchronized (mLock) {
--- /dev/null
+/*
+ * Copyright (C) 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.camera.async;
+
+import com.android.camera.util.Callback;
+import com.google.common.base.Objects;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a callback to filter out duplicate updates.
+ */
+public class FilteredCallback<T> implements Callback<T> {
+ private final Callback<? super T> mCallback;
+ private T mLatestValue;
+
+ public FilteredCallback(Callback<? super T> callback) {
+ mCallback = callback;
+ mLatestValue = null;
+ }
+
+ @Override
+ public void onCallback(@Nonnull T newValue) {
+ if (!Objects.equal(mLatestValue, newValue)) {
+ mLatestValue = newValue;
+ mCallback.onCallback(newValue);
+ }
+ }
+}
import java.util.concurrent.Executor;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
public abstract class ForwardingObservable<T> implements Observable<T> {
private final Observable<T> mDelegate;
mDelegate = delegate;
}
+ @Nonnull
@Override
+ @CheckReturnValue
public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
return mDelegate.addCallback(callback, executor);
}
+ @Nonnull
@Override
public T get() {
return mDelegate.get();
import java.util.concurrent.Executor;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.ThreadSafe;
+
/**
* An interface for thread-safe observable objects.
*/
+@ThreadSafe
+@ParametersAreNonnullByDefault
public interface Observable<T> extends Supplier<T> {
/**
* Adds the given callback, returning a handle to be closed when the
*
* @param callback The callback to add.
* @param executor The executor on which the callback will be invoked.
- * @return A {@link SafeCloseable} handle to be closed when the callback
- * must be removed.
+ * @return A handle to be closed when the callback must be removed.
*/
+ @Nonnull
+ @CheckReturnValue
public SafeCloseable addCallback(Callback<T> callback, Executor executor);
+
+ /**
+ * @return The current/latest value.
+ */
+ @Nonnull
+ @Override
+ public T get();
}
--- /dev/null
+/*
+ * Copyright (C) 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.camera.async;
+
+import com.android.camera.util.Callback;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Enables combining multiple {@link Observable}s together with a given
+ * function.
+ * <p>
+ * Callbacks added to the resulting observable are notified when any of the
+ * dependencies change.
+ */
+@ThreadSafe
+@ParametersAreNonnullByDefault
+final class ObservableCombiner<I, O> implements Observable<O> {
+ private final ImmutableList<Observable<I>> mInputs;
+ private final Function<List<I>, O> mFunction;
+
+ private final Object mLock;
+
+ @GuardedBy("mLock")
+ private final ConcurrentState<O> mListenerNotifier;
+
+ @GuardedBy("mLock")
+ private final List<SafeCloseable> mInputCallbackHandles;
+
+ @GuardedBy("mLock")
+ private int mNumRegisteredCallbacks;
+
+ /**
+ * The thread-safe callback to be registered with each input.
+ */
+ private final Updatable<I> mInputCallback = new Updatable<I>() {
+ public void update(I ignored) {
+ mListenerNotifier.update(get());
+ }
+ };
+
+ private ObservableCombiner(List<? extends Observable<I>> inputs,
+ Function<List<I>, O> function, O initialValue) {
+ mInputs = ImmutableList.copyOf(inputs);
+ mFunction = function;
+ mListenerNotifier = new ConcurrentState<>(initialValue);
+ mLock = new Object();
+ mInputCallbackHandles = new ArrayList<>();
+ mNumRegisteredCallbacks = 0;
+ }
+
+ /**
+ * Transforms a set of input observables with a function.
+ *
+ * @param inputs The input observables.
+ * @param function The function to apply to all of the inputs.
+ * @param <I> The type of all inputs values.
+ * @param <O> The type of the output values.
+ * @return An observable which will reflect the combination of all inputs
+ * with the given function. Changes in the output value will result
+ * in calls to any callbacks registered with the output.
+ */
+ static <I, O> Observable<O> transform(List<? extends Observable<I>> inputs,
+ Function<List<I>, O> function) {
+ // Compute the initial value.
+ ArrayList<I> deps = new ArrayList<>();
+ for (Observable<? extends I> input : inputs) {
+ deps.add(input.get());
+ }
+ O initialValue = function.apply(deps);
+
+ return new ObservableCombiner<>(inputs, function, initialValue);
+ }
+
+ @GuardedBy("mLock")
+ private void addCallbacksToInputs() {
+ for (Observable<I> observable : mInputs) {
+ final SafeCloseable callbackHandle =
+ Observables.addThreadSafeCallback(observable, mInputCallback);
+
+ mInputCallbackHandles.add(callbackHandle);
+ }
+ }
+
+ @GuardedBy("mLock")
+ private void removeCallbacksFromInputs() {
+ for (SafeCloseable callbackHandle : mInputCallbackHandles) {
+ callbackHandle.close();
+ }
+ }
+
+ @Nonnull
+ @Override
+ @CheckReturnValue
+ public SafeCloseable addCallback(final Callback<O> callback, Executor executor) {
+ // When a callback is added to this, the "output", we must ensure that
+ // callbacks are registered with all of the inputs so that they can be
+ // forwarded properly.
+ // Instead of adding another callback to each input for each callback
+ // registered with the output, callbacks are registered when the first
+ // output callback is added, and removed when the last output callback
+ // is removed.
+
+ synchronized (mLock) {
+ if (mNumRegisteredCallbacks == 0) {
+ addCallbacksToInputs();
+ }
+ mNumRegisteredCallbacks++;
+ }
+
+ // Wrap the callback in a {@link FilteredCallback} to prevent many
+ // duplicate/cascading updates even if the output does not change.
+ final SafeCloseable resultingCallbackHandle = mListenerNotifier.addCallback(
+ new FilteredCallback<O>(callback), executor);
+
+ return new SafeCloseable() {
+ @Override
+ public void close() {
+ resultingCallbackHandle.close();
+
+ synchronized (mLock) {
+ mNumRegisteredCallbacks--;
+ if (mNumRegisteredCallbacks == 0) {
+ removeCallbacksFromInputs();
+ }
+ }
+ }
+ };
+ }
+
+ @Nonnull
+ @Override
+ public O get() {
+ ArrayList<I> deps = new ArrayList<>();
+ for (Observable<? extends I> dependency : mInputs) {
+ deps.add(dependency.get());
+ }
+ return mFunction.apply(deps);
+ }
+}
import com.google.common.base.Function;
import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
/**
* Helper methods for {@link Observable}.
*/
+@ParametersAreNonnullByDefault
public class Observables {
+ private static final SafeCloseable NOOP_CALLBACK_HANDLE = new SafeCloseable() {
+ @Override
+ public void close() {
+ // Do Nothing.
+ }
+ };
+
private Observables() {
}
public static <F, T> Observable<T> transform(final Observable<F> input,
final Function<F, T> function) {
return new Observable<T>() {
+ @Nonnull
@Override
public T get() {
return function.apply(input.get());
}
+ @CheckReturnValue
+ @Nonnull
@Override
- public SafeCloseable addCallback(final Callback<T> callback, Executor executor) {
+ public SafeCloseable addCallback(final Callback<T> callback,
+ Executor executor) {
return input.addCallback(new Callback<F>() {
@Override
public void onCallback(F result) {
}
/**
+ * Transforms a set of observables with a function.
+ *
+ * @return The transformed observable.
+ */
+ public static <F, T> Observable<T> transform(final List<? extends Observable<F>> input,
+ Function<List<F>, T> function) {
+ return ObservableCombiner.transform(input, function);
+ }
+
+ /**
* @return An observable which has the given constant value.
*/
@Nonnull
public static <T> Observable<T> of(final @Nullable T constant) {
return new Observable<T>() {
+ @Nonnull
@Override
public T get() {
return constant;
}
+ @CheckReturnValue
+ @Nonnull
@Override
public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
- return new SafeCloseable() {
- @Override
- public void close() {
- }
- };
+ return NOOP_CALLBACK_HANDLE;
}
};
}
@Nonnull
@CheckReturnValue
- public static <T> SafeCloseable addThreadSafeCallback(
- @Nonnull Observable<T> observable,
- final @Nonnull Updatable<T> callback) {
+ public static <T> SafeCloseable addThreadSafeCallback(Observable<T> observable,
+ final Updatable<T> callback) {
return observable.addCallback(new Callback<T>() {
@Override
public void onCallback(T result) {
import javax.annotation.Nonnull;
/**
- * This provides empty, NoOp implementation of generic updateable
- * objects.
+ * This provides empty, NoOp implementation of generic updateable objects.
*/
-public final class NoOp<T> implements Updatable<T> {
- private static final NoOp<Object> INSTANCE = new NoOp<>();
+public final class Updatables {
+ private static final Updatable NOOP = new Updatable() {
+ @Override
+ public void update(@Nonnull Object o) {
+ // Do nothing.
+ }
+ };
- @SuppressWarnings("unchecked")
- public static <T> Updatable<T> ofUpdateable() {
- return (Updatable<T>) INSTANCE;
+ private Updatables() {
}
- private NoOp() { }
-
- @Override
- public void update(@Nonnull T t) { }
-}
\ No newline at end of file
+ public static <T> Updatable<T> getNoOp() {
+ return NOOP;
+ }
+}
import com.android.camera.async.HandlerFactory;
import com.android.camera.async.Lifetime;
+import com.android.camera.async.Observable;
import com.android.camera.async.Updatable;
import com.android.camera.one.v2.camera2proxy.ImageReaderProxy;
import com.android.camera.one.v2.core.RequestBuilder;
* <p>
* Add the OnImageAvailableListener to the image reader in a separate thread.
* <p>
- * Use the ImageQueueCaptureStreamFactory to create image streams to add to
+ * Use the {@link ImageStreamFactory} to create image streams to add to
* {@link RequestBuilder}s to interact with the camera and ImageReader.
*/
public class SharedImageReaderFactory {
private final Updatable<Long> mGlobalTimestampQueue;
private final ImageStreamFactory mSharedImageReader;
+ private final Observable<Integer> mAvailableImageCount;
/**
* @param lifetime The lifetime of the SharedImageReader, and other
mGlobalTimestampQueue = imageDistributorFactory.provideGlobalTimestampCallback();
TicketPool ticketPool = new FiniteTicketPool(imageReader.getMaxImages() - 2);
+ mAvailableImageCount = ticketPool.getAvailableTicketCount();
mSharedImageReader = new ImageStreamFactory(
new Lifetime(lifetime), ticketPool, imageReader.getSurface(), imageDistributor);
}
public ImageStreamFactory provideSharedImageReader() {
return mSharedImageReader;
}
+
+ public Observable<Integer> provideAvailableImageCount() {
+ return mAvailableImageCount;
+ }
}
import com.android.camera.async.BufferQueue;
import com.android.camera.async.HandlerFactory;
import com.android.camera.async.Lifetime;
+import com.android.camera.async.Observable;
import com.android.camera.async.Updatable;
import com.android.camera.one.v2.camera2proxy.ImageProxy;
import com.android.camera.one.v2.camera2proxy.ImageReaderProxy;
import com.android.camera.one.v2.sharedimagereader.ticketpool.FiniteTicketPool;
import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
+import static com.android.camera.one.v2.core.ResponseListeners.forFinalMetadata;
+import static com.android.camera.one.v2.core.ResponseListeners.forTimestamps;
+
/**
* Like {@link SharedImageReaderFactory}, but provides a single
* {@link ImageStream} with a dynamic capacity which changes depending on demand
private final ImageStream mZslCaptureStream;
private final MetadataPool mMetadataPool;
private final RequestTemplate mRequestTemplate;
+ private final Observable<Integer> mAvailableImageCount;
/**
* @param lifetime The lifetime of the SharedImageReader, and other
new Lifetime(lifetime), ringBufferFactory.provideTicketPool(),
imageReader.getSurface(), imageDistributor);
+ mAvailableImageCount = ringBufferFactory.provideTicketPool().getAvailableTicketCount();
+
mRequestTemplate = new RequestTemplate(rootRequestTemplate);
mRequestTemplate.addStream(mZslCaptureStream);
mRequestTemplate.addResponseListener(forTimestamps(globalTimestampQueue));
public RequestBuilder.Factory provideRequestTemplate() {
return mRequestTemplate;
}
+
+ public Observable<Integer> provideAvailableImageCount() {
+ return mAvailableImageCount;
+ }
}
package com.android.camera.one.v2.sharedimagereader.ringbuffer;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.android.camera.async.Observable;
+import com.android.camera.async.Observables;
import com.android.camera.one.v2.sharedimagereader.ticketpool.Ticket;
import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketProvider;
-import com.google.common.base.Supplier;
+import com.google.common.base.Function;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
/**
* Splits a {@link TicketPool} into a high-priority lane and a low-priority,
private LowPriorityTicketPool() {
}
+ @Nullable
+ @CheckReturnValue
+ @Override
public Ticket tryAcquire() {
if (mHighPriorityWaiters.get() > 0) {
return null;
}
private class HighPriorityTicketPool implements TicketPool {
+ @Nonnull
@Override
public Collection<Ticket> acquire(int tickets) throws InterruptedException,
NoCapacityAvailableException {
return result;
}
+ @Nonnull
@Override
- public boolean canAcquire(int numTickets) {
- if (numTickets < 0) {
- return false;
- }
- // The number of additional tickets which must be requested from the
- // parent pool to fulfill the hypothetical request for numTickets
- // tickets, after flushing low-priority tickets.
- int numAdditionalTickets = numTickets - mReleasableLowPriorityTicketCount.get();
- if (numAdditionalTickets <= 0) {
- return true;
- }
- return mRootTicketPool.canAcquire(numAdditionalTickets);
+ public Observable<Integer> getAvailableTicketCount() {
+ return mAvailableHighPriorityTicketCount;
}
+ @Nullable
+ @CheckReturnValue
@Override
public Ticket tryAcquire() {
// This over-aggressively flushes all low-priority tickets every
}
private final LowPriorityTicketReleaser mLowPriorityTicketReleaser;
- private final Supplier<Integer> mReleasableLowPriorityTicketCount;
private final TicketPool mRootTicketPool;
private final AtomicInteger mHighPriorityWaiters;
private final HighPriorityTicketPool mHighPriority;
private final LowPriorityTicketPool mLowPriority;
+ private final Observable<Integer> mAvailableHighPriorityTicketCount;
+
public TicketPoolPrioritizer(LowPriorityTicketReleaser lowPriorityTicketReleaser,
- Supplier<Integer> releasableLowPriorityTicketCount, TicketPool rootTicketPool) {
+ Observable<Integer> releasableLowPriorityTicketCount, TicketPool rootTicketPool) {
mLowPriorityTicketReleaser = lowPriorityTicketReleaser;
- mReleasableLowPriorityTicketCount = releasableLowPriorityTicketCount;
mRootTicketPool = rootTicketPool;
mHighPriorityWaiters = new AtomicInteger(0);
mHighPriority = new HighPriorityTicketPool();
mLowPriority = new LowPriorityTicketPool();
+
+ final Observable<Integer> parentPoolTicketCount = rootTicketPool.getAvailableTicketCount();
+ Function<List<Integer>, Integer> sum = new Function<List<Integer>, Integer>() {
+ @Override
+ public Integer apply(@Nullable List<Integer> integers) {
+ int total = 0;
+ for (Integer i : integers) {
+ total += i;
+ }
+ return total;
+ }
+ };
+
+ // The number of high-priority tickets available at any time is the sum
+ // of parentPoolTicketCount and releaseableLowPriorityTicketCount.
+ mAvailableHighPriorityTicketCount = Observables.transform(
+ Arrays.asList(parentPoolTicketCount, releasableLowPriorityTicketCount),
+ sum);
}
public TicketProvider getLowPriorityTicketProvider() {
package com.android.camera.one.v2.sharedimagereader.ticketpool;
+import com.android.camera.async.ConcurrentState;
+import com.android.camera.async.Observable;
+import com.android.camera.async.SafeCloseable;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
-import com.android.camera.async.SafeCloseable;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
/**
* A ticket pool with a finite number of tickets.
*/
-public class FiniteTicketPool implements TicketPool, SafeCloseable {
+public final class FiniteTicketPool implements TicketPool, SafeCloseable {
private class TicketImpl implements Ticket {
private final AtomicBoolean mClosed;
public void close() {
boolean alreadyClosed = mClosed.getAndSet(true);
if (!alreadyClosed) {
- mTickets.release();
+ synchronized (mLock) {
+ releaseTicket();
+ updateAvailableTicketCount();
+ }
}
}
}
+ private class Waiter {
+ private final int mTicketsRequested;
+ private final Condition mCondition;
+
+ private Waiter(int ticketsRequested, Condition condition) {
+ mTicketsRequested = ticketsRequested;
+ mCondition = condition;
+ }
+
+ public Condition getCondition() {
+ return mCondition;
+ }
+
+ public int getTicketsRequested() {
+ return mTicketsRequested;
+ }
+ }
+
private final int mMaxCapacity;
- private final Semaphore mTickets;
- private final AtomicBoolean mClosed;
+ private final ReentrantLock mLock;
+ @GuardedBy("mLock")
+ private final LinkedList<Waiter> mWaiters;
+ private final ConcurrentState<Integer> mAvailableTicketCount;
+ @GuardedBy("mLock")
+ private int mTickets;
+ @GuardedBy("mLock")
+ private boolean mClosed;
public FiniteTicketPool(int capacity) {
mMaxCapacity = capacity;
- mTickets = new Semaphore(capacity);
- mClosed = new AtomicBoolean(false);
+ mLock = new ReentrantLock(true);
+ mTickets = capacity;
+ mWaiters = new LinkedList<>();
+ mClosed = false;
+ mAvailableTicketCount = new ConcurrentState<>(capacity);
+ }
+
+ @GuardedBy("mLock")
+ private void releaseTicket() {
+ mLock.lock();
+ try {
+ mTickets++;
+
+ // Wake up waiters in order, so long as their requested number of
+ // tickets can be satisfied.
+ int ticketsRemaining = mTickets;
+ Waiter nextWaiter = mWaiters.peekFirst();
+ while (nextWaiter != null && nextWaiter.getTicketsRequested() <= ticketsRemaining) {
+ ticketsRemaining -= nextWaiter.getTicketsRequested();
+ nextWaiter.getCondition().signal();
+
+ mWaiters.removeFirst();
+ nextWaiter = mWaiters.peekFirst();
+ }
+ } finally {
+ mLock.unlock();
+ }
}
+ @Nonnull
@Override
public Collection<Ticket> acquire(int tickets) throws InterruptedException,
NoCapacityAvailableException {
- if (tickets > mMaxCapacity || tickets < 0) {
- throw new NoCapacityAvailableException();
- }
- mTickets.acquire(tickets);
- if (mClosed.get()) {
- // If the pool was closed while we were waiting to acquire the
- // tickets, release them (they may be fake) and throw because no
- // capacity is available.
- mTickets.release(tickets);
- throw new NoCapacityAvailableException();
+ mLock.lock();
+ try {
+ if (tickets > mMaxCapacity || tickets < 0) {
+ throw new NoCapacityAvailableException();
+ }
+ Waiter thisWaiter = new Waiter(tickets, mLock.newCondition());
+ mWaiters.addLast(thisWaiter);
+ updateAvailableTicketCount();
+ try {
+ while (mTickets < tickets && !mClosed) {
+ thisWaiter.getCondition().await();
+ }
+ if (mClosed) {
+ throw new NoCapacityAvailableException();
+ }
+
+ mTickets -= tickets;
+
+ updateAvailableTicketCount();
+
+ List<Ticket> ticketList = new ArrayList<>();
+ for (int i = 0; i < tickets; i++) {
+ ticketList.add(new TicketImpl());
+ }
+ return ticketList;
+ } finally {
+ mWaiters.remove(thisWaiter);
+ updateAvailableTicketCount();
+ }
+ } finally {
+ mLock.unlock();
}
- List<Ticket> ticketList = new ArrayList<Ticket>();
- for (int i = 0; i < tickets; i++) {
- ticketList.add(new TicketImpl());
+ }
+
+ @GuardedBy("mLock")
+ private void updateAvailableTicketCount() {
+ if (mClosed || !mWaiters.isEmpty()) {
+ mAvailableTicketCount.update(0);
+ } else {
+ mAvailableTicketCount.update(mTickets);
}
- return ticketList;
}
+ @Nonnull
@Override
- public boolean canAcquire(int tickets) {
- if (tickets < 0) {
- return false;
- }
- if (tickets == 0) {
- return true;
- }
- return !mClosed.get() &&
- !mTickets.hasQueuedThreads() &&
- mTickets.availablePermits() >= tickets;
+ public Observable<Integer> getAvailableTicketCount() {
+ return mAvailableTicketCount;
}
@Override
public Ticket tryAcquire() {
- if (mTickets.tryAcquire()) {
- // Release the ticket immediately if...
- // 1. The pool is closed, and tryAcquire may have received a fake,
- // "poisson pill" ticket, which must be released in order to enable
- // other calls to acquire() to not block.
- // 2. Or, mTickets has threads queued on it because of blocked calls
- // to {@link #acquire}. It would not be fair to acquire this ticket
- // if there are pending requests for it already.
- boolean releaseTicketImmediately = mClosed.get() || mTickets.hasQueuedThreads();
- if (releaseTicketImmediately) {
- mTickets.release();
- return null;
- } else {
+ mLock.lock();
+ try {
+ if (!mClosed && mWaiters.isEmpty() && mTickets >= 1) {
+ mTickets--;
+ updateAvailableTicketCount();
return new TicketImpl();
+ } else {
+ return null;
}
- } else {
- return null;
+ } finally {
+ mLock.unlock();
}
}
@Override
public void close() {
- if (mClosed.getAndSet(true)) {
- // If already closed, just return.
- return;
- }
+ mLock.lock();
+ try {
+ if (mClosed) {
+ return;
+ }
- // Threads may be waiting in acquire() for tickets to be available, so
- // wake them up by adding fake, "poisson pill" tickets.
- // Adding mMaxCapacity permits to the semaphore is sufficient to
- // guarantee that any/all
- // waiting threads wake up and detect that the pool is closed (so long
- // as all waiting
- // threads release the fake tickets they acquired while waking up).
- mTickets.release(mMaxCapacity);
+ mClosed = true;
+
+ for (Waiter waiter : mWaiters) {
+ waiter.getCondition().signal();
+ }
+
+ updateAvailableTicketCount();
+ } finally {
+ mLock.unlock();
+ }
}
}
package com.android.camera.one.v2.sharedimagereader.ticketpool;
+import com.android.camera.async.ConcurrentState;
+import com.android.camera.async.Observable;
+import com.android.camera.async.SafeCloseable;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import com.android.camera.async.SafeCloseable;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.GuardedBy;
/**
* A TicketPool which reserves a capacity of tickets ahead of time from its
* as soon as possible, which may depend on consumers of this ticket pool
* closing tickets which had previously been acquired.
*/
+@ParametersAreNonnullByDefault
public class ReservableTicketPool implements TicketPool, SafeCloseable {
private static class Waiter {
- public final Condition mDoneCondition;
- public final int mRequestedTicketCount;
+ private final Condition mDoneCondition;
+ private final int mRequestedTicketCount;
private Waiter(Condition doneCondition, int requestedTicketCount) {
mDoneCondition = doneCondition;
mRequestedTicketCount = requestedTicketCount;
}
+
+ public Condition getDoneCondition() {
+ return mDoneCondition;
+ }
+
+ public int getRequestedTicketCount() {
+ return mRequestedTicketCount;
+ }
}
/**
releaseToParent = (mParentTickets.size() == mCapacity);
if (!releaseToParent) {
mParentTickets.add(mParentTicket);
+ updateCurrentTicketCount();
releaseWaitersOnTicketAvailability();
}
} finally {
private final ReentrantLock mLock;
/**
* A Queue containing the number of tickets requested by each thread
- * currently blocked on {@link #mTicketWaiters} in {@link #acquire}.
+ * currently blocked in {@link #acquire}.
*/
+ @GuardedBy("mLock")
private final ArrayDeque<Waiter> mTicketWaiters;
/**
* Tickets from mParentPool which have not been given to clients via
* {@link #acquire}.
*/
+ @GuardedBy("mLock")
private final ArrayDeque<Ticket> mParentTickets;
/**
+ * Maintains an observable count of the number of tickets which are readily
+ * available at any time.
+ */
+ private final ConcurrentState<Integer> mAvailableTicketCount;
+ /**
* The total number of tickets available and outstanding (acquired but not
* closed).
*/
+ @GuardedBy("mLock")
private int mCapacity;
public ReservableTicketPool(TicketPool parentPool) {
mTicketWaiters = new ArrayDeque<>();
mParentTickets = new ArrayDeque<>();
mCapacity = 0;
+ mAvailableTicketCount = new ConcurrentState<>(0);
+ }
+
+ @GuardedBy("mLock")
+ private void updateCurrentTicketCount() {
+ mLock.lock();
+ try {
+ if (mTicketWaiters.size() != 0) {
+ mAvailableTicketCount.update(0);
+ } else {
+ mAvailableTicketCount.update(mParentTickets.size());
+ }
+ } finally {
+ mLock.unlock();
+ }
}
+ @Nonnull
@Override
public Collection<Ticket> acquire(int tickets) throws InterruptedException,
NoCapacityAvailableException {
return wrappedTicketList;
}
+ @Nonnull
@Override
- public boolean canAcquire(int tickets) {
- if (tickets == 0) {
- return true;
- }
- if (tickets < 0) {
- return false;
- }
- boolean canAcquire = false;
- mLock.lock();
- try {
- // Return true if no threads are waiting to acquire tickets and
- // there are enough tickets in the pool to satisfy such a request.
- if (mTicketWaiters.size() == 0 &&
- mParentTickets.size() >= tickets) {
- canAcquire = true;
- } else {
- canAcquire = false;
- }
- } finally {
- mLock.unlock();
- }
-
- return canAcquire;
+ public Observable<Integer> getAvailableTicketCount() {
+ return mAvailableTicketCount;
}
@Override
return null;
}
parentTicket = mParentTickets.remove();
+ updateCurrentTicketCount();
} finally {
mLock.unlock();
}
} finally {
mLock.unlock();
}
+
+ updateCurrentTicketCount();
}
/**
for (Ticket ticket : parentTicketsToRelease) {
ticket.close();
}
+
+ updateCurrentTicketCount();
}
/**
private Collection<Ticket> acquireParentTickets(int tickets) throws InterruptedException,
NoCapacityAvailableException {
// The list of tickets from mTicketList to acquire.
- // Try to acquire these immediately, if there are no threads already waiting for tickets.
+ // Try to acquire these immediately, if there are no threads already
+ // waiting for tickets.
List<Ticket> acquiredParentTickets = null;
mLock.lock();
try {
}
Waiter thisWaiter = new Waiter(mLock.newCondition(), tickets);
mTicketWaiters.add(thisWaiter);
+ updateCurrentTicketCount();
try {
while (acquiredParentTickets == null) {
- thisWaiter.mDoneCondition.await();
+ thisWaiter.getDoneCondition().await();
acquiredParentTickets = tryAcquireAtomically(tickets);
}
} finally {
mTicketWaiters.remove(thisWaiter);
}
+ updateCurrentTicketCount();
} finally {
mLock.unlock();
}
* @return The tickets acquired from the parent ticket pool, or null if they
* could not be acquired.
*/
+ @Nullable
+ @CheckReturnValue
private List<Ticket> tryAcquireAtomically(int tickets) throws NoCapacityAvailableException {
List<Ticket> acquiredParentTickets = new ArrayList<>();
mLock.lock();
for (int i = 0; i < tickets; i++) {
acquiredParentTickets.add(mParentTickets.remove());
}
+ updateCurrentTicketCount();
return acquiredParentTickets;
}
} finally {
int numTicketsReadilyAvailable = mParentTickets.size();
while (mTicketWaiters.size() > 0) {
Waiter nextWaiter = mTicketWaiters.peek();
- if (nextWaiter.mRequestedTicketCount <= numTicketsReadilyAvailable) {
- numTicketsReadilyAvailable -= nextWaiter.mRequestedTicketCount;
- nextWaiter.mDoneCondition.signal();
+ if (nextWaiter.getRequestedTicketCount() <= numTicketsReadilyAvailable) {
+ numTicketsReadilyAvailable -= nextWaiter.getRequestedTicketCount();
+ nextWaiter.getDoneCondition().signal();
mTicketWaiters.poll();
} else {
return;
try {
// Release all waiters requesting more tickets than the current
// capacity
- List<Waiter> toRemove = new ArrayList<Waiter>();
+ List<Waiter> toRemove = new ArrayList<>();
for (Waiter waiter : mTicketWaiters) {
- if (waiter.mRequestedTicketCount > mCapacity) {
+ if (waiter.getRequestedTicketCount() > mCapacity) {
toRemove.add(waiter);
}
}
for (Waiter waiter : toRemove) {
- waiter.mDoneCondition.signal();
+ waiter.getDoneCondition().signal();
}
} finally {
mLock.unlock();
package com.android.camera.one.v2.sharedimagereader.ticketpool;
+import com.android.camera.async.Observable;
+
import java.util.Collection;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
* Stores a collection of {@link Ticket}s. Tickets may be acquired from the
* pool. When closed, tickets return themselves to the pool.
}
/**
- * Acquires and returns the specified number of tickets.
+ * Acquires and returns the specified number of tickets. The caller owns all
+ * returned tickets and is responsible for eventually closing them.
* <p>
- * Note: Implementations MUST be fair!
- * </p>
+ * Implementations must be fair w.r.t. other calls to acquire.
*/
+ @Nonnull
public Collection<Ticket> acquire(int tickets) throws InterruptedException,
NoCapacityAvailableException;
/**
- * Provides a hint as to whether or not the specified number of tickets can
- * be acquired immediately, or will probably block.
- *
- * @return True if a subsequent call to {@link #acquire} will probably not
- * block.
+ * @return The number of tickets readily-available for immediate
+ * acquisition, as an observable object.
*/
- public boolean canAcquire(int tickets);
+ @Nonnull
+ public Observable<Integer> getAvailableTicketCount();
/**
- * Attempts to acquire and return a ticket.
+ * Attempts to acquire and return a ticket. The caller owns the resulting
+ * ticket (if not null) and is responsible for eventually closing it.
* <p>
- * Note: Implementations MUST be fair w.r.t. {@link #acquire}.
- * </p>
+ * Implementations must be fair w.r.t. {@link #acquire}.
*
* @return The acquired ticket, or null if no ticket is readily available.
*/
@Override
+ @Nullable
+ @CheckReturnValue
public Ticket tryAcquire();
}
package com.android.camera.one.v2.sharedimagereader.ticketpool;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
+
public interface TicketProvider {
/**
* Attempts to acquire and return a ticket.
*
* @return The acquired ticket, or null if no ticket is readily available.
*/
+ @Nullable
+ @CheckReturnValue
public Ticket tryAcquire();
}
package com.android.camera.one.v2.stats;
-import com.android.camera.async.NoOp;
+import com.android.camera.async.Updatables;
import com.android.camera.async.Updatable;
import com.android.camera.debug.Log;
import com.android.camera.debug.Log.Tag;
private double mFpsValue = 30.0f;
public PreviewFpsListener() {
- this(NoOp.<Float>ofUpdateable());
+ this(Updatables.<Float>getNoOp());
}
public PreviewFpsListener(Updatable<Float> fpsListener) {
package com.android.camera.settings;
+import com.android.camera.async.ExecutorCallback;
import com.android.camera.async.FilteredUpdatable;
import com.android.camera.async.Observable;
-import com.android.camera.async.ExecutorCallback;
import com.android.camera.async.SafeCloseable;
import com.android.camera.async.Updatable;
import com.android.camera.util.Callback;
import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
/**
@ThreadSafe
public final class SettingObserver<T> implements Observable<T> {
private class Listener implements SettingsManager.OnSettingChangedListener, SafeCloseable {
- private final Updatable<T> mCallback;
+ private final Updatable<? super T> mCallback;
- private Listener(Updatable<T> callback) {
+ private Listener(Updatable<? super T> callback) {
mCallback = callback;
}
Boolean.class);
}
+ @CheckReturnValue
+ @Nonnull
@Override
- public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
+ public SafeCloseable addCallback(@Nonnull Callback<T> callback, @Nonnull Executor executor) {
final Listener listener =
new Listener(new FilteredUpdatable<>(new ExecutorCallback<>(callback, executor)));
mSettingsManager.addListener(listener);
return listener;
}
- @Nullable
+ @Nonnull
@Override
@SuppressWarnings("unchecked")
public T get() {
package com.android.camera.util;
+import javax.annotation.Nonnull;
+
/**
* Simple callback.
* TODO: Move to async.
*/
public interface Callback<T> {
- public void onCallback(T result);
+ public void onCallback(@Nonnull T result);
}