OSDN Git Service

Enable tracking of image availability
authorPuneet Lall <puneetl@google.com>
Thu, 22 Jan 2015 01:08:05 +0000 (17:08 -0800)
committerPuneet Lall <puneetl@google.com>
Fri, 30 Jan 2015 21:30:41 +0000 (13:30 -0800)
To allow updating the UI whenever image reader space is exhausted, we
must be able to track whether or not non-zsl images can be allocated at
any given time.  This CL implements this for the general case of both
single images as well as for finite bursts.

Bug: 18934542

Change-Id: Ia8c6e03f631cf47e4385cb8da1e3f6d74e7901a2

17 files changed:
src/com/android/camera/async/ConcurrentState.java
src/com/android/camera/async/FilteredCallback.java [new file with mode: 0644]
src/com/android/camera/async/ForwardingObservable.java
src/com/android/camera/async/Observable.java
src/com/android/camera/async/ObservableCombiner.java [new file with mode: 0644]
src/com/android/camera/async/Observables.java
src/com/android/camera/async/Updatables.java [moved from src/com/android/camera/async/NoOp.java with 69% similarity]
src/com/android/camera/one/v2/sharedimagereader/SharedImageReaderFactory.java
src/com/android/camera/one/v2/sharedimagereader/ZslSharedImageReaderFactory.java
src/com/android/camera/one/v2/sharedimagereader/ringbuffer/TicketPoolPrioritizer.java
src/com/android/camera/one/v2/sharedimagereader/ticketpool/FiniteTicketPool.java
src/com/android/camera/one/v2/sharedimagereader/ticketpool/ReservableTicketPool.java
src/com/android/camera/one/v2/sharedimagereader/ticketpool/TicketPool.java
src/com/android/camera/one/v2/sharedimagereader/ticketpool/TicketProvider.java
src/com/android/camera/one/v2/stats/PreviewFpsListener.java
src/com/android/camera/settings/SettingObserver.java
src/com/android/camera/util/Callback.java

index 73aa731..f56aa38 100644 (file)
 
 package com.android.camera.async;
 
-import java.util.ArrayList;
+import com.android.camera.util.Callback;
+
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
-import com.android.camera.util.Callback;
-
+import javax.annotation.CheckReturnValue;
 import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
 
 /**
  * Generic asynchronous state wrapper which supports two methods of interaction:
  * polling for the latest value and listening for updates.
- * <p>
- * Note that this class only supports polling and using listeners. If
- * synchronous consumption of state changes is required, see
- * {@link FutureResult} or {@link BufferQueue} and its implementations.
- * </p>
  */
+@ParametersAreNonnullByDefault
 public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
 
     private static class ExecutorListenerPair<T> {
@@ -51,6 +47,7 @@ public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
          */
         public void run(final T t) {
             mExecutor.execute(new Runnable() {
+                @Override
                 public void run() {
                     mListener.onCallback(t);
                 }
@@ -59,7 +56,7 @@ public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
     }
 
     private final Object mLock;
-    private final Set<ExecutorListenerPair<T>> mListeners;
+    private final Set<ExecutorListenerPair<? super T>> mListeners;
     private T mValue;
 
     public ConcurrentState(T initialValue) {
@@ -72,23 +69,25 @@ public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
      * Updates the state to the latest value, notifying all listeners.
      */
     @Override
-    public void update(@Nonnull T newValue) {
-        List<ExecutorListenerPair<T>> listeners = new ArrayList<>();
+    public void update(T newValue) {
         synchronized (mLock) {
             mValue = newValue;
-            // Copy listeners out here so we can iterate over the list outside
-            // the critical section.
-            listeners.addAll(mListeners);
-        }
-        for (ExecutorListenerPair<T> pair : listeners) {
-            pair.run(newValue);
+            // Invoke executors.execute within mLock to guarantee that
+            // callbacks are serialized into their respective executor in
+            // the proper order.
+            for (ExecutorListenerPair<? super T> pair : mListeners) {
+                pair.run(newValue);
+            }
         }
     }
 
+    @CheckReturnValue
+    @Nonnull
     @Override
-    public SafeCloseable addCallback(Callback callback, Executor executor) {
+    public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
         synchronized (mLock) {
-            final ExecutorListenerPair<T> pair = new ExecutorListenerPair<>(executor, callback);
+            final ExecutorListenerPair<? super T> pair =
+                    new ExecutorListenerPair<>(executor, callback);
             mListeners.add(pair);
 
             return new SafeCloseable() {
@@ -107,6 +106,7 @@ public class ConcurrentState<T> implements Updatable<T>, Observable<T> {
      *
      * @return The latest state.
      */
+    @Nonnull
     @Override
     public T get() {
         synchronized (mLock) {
diff --git a/src/com/android/camera/async/FilteredCallback.java b/src/com/android/camera/async/FilteredCallback.java
new file mode 100644 (file)
index 0000000..382a79c
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.camera.async;
+
+import com.android.camera.util.Callback;
+import com.google.common.base.Objects;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a callback to filter out duplicate updates.
+ */
+public class FilteredCallback<T> implements Callback<T> {
+    private final Callback<? super T> mCallback;
+    private T mLatestValue;
+
+    public FilteredCallback(Callback<? super T> callback) {
+        mCallback = callback;
+        mLatestValue = null;
+    }
+
+    @Override
+    public void onCallback(@Nonnull T newValue) {
+        if (!Objects.equal(mLatestValue, newValue)) {
+            mLatestValue = newValue;
+            mCallback.onCallback(newValue);
+        }
+    }
+}
index f8cf55f..716a3cc 100644 (file)
@@ -20,6 +20,11 @@ import com.android.camera.util.Callback;
 
 import java.util.concurrent.Executor;
 
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
 public abstract class ForwardingObservable<T> implements Observable<T> {
     private final Observable<T> mDelegate;
 
@@ -27,11 +32,14 @@ public abstract class ForwardingObservable<T> implements Observable<T> {
         mDelegate = delegate;
     }
 
+    @Nonnull
     @Override
+    @CheckReturnValue
     public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
         return mDelegate.addCallback(callback, executor);
     }
 
+    @Nonnull
     @Override
     public T get() {
         return mDelegate.get();
index e9b3df0..7d56da0 100644 (file)
@@ -21,9 +21,16 @@ import com.google.common.base.Supplier;
 
 import java.util.concurrent.Executor;
 
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.ThreadSafe;
+
 /**
  * An interface for thread-safe observable objects.
  */
+@ThreadSafe
+@ParametersAreNonnullByDefault
 public interface Observable<T> extends Supplier<T> {
     /**
      * Adds the given callback, returning a handle to be closed when the
@@ -31,8 +38,16 @@ public interface Observable<T> extends Supplier<T> {
      *
      * @param callback The callback to add.
      * @param executor The executor on which the callback will be invoked.
-     * @return A {@link SafeCloseable} handle to be closed when the callback
-     *         must be removed.
+     * @return A handle to be closed when the callback must be removed.
      */
+    @Nonnull
+    @CheckReturnValue
     public SafeCloseable addCallback(Callback<T> callback, Executor executor);
+
+    /**
+     * @return The current/latest value.
+     */
+    @Nonnull
+    @Override
+    public T get();
 }
diff --git a/src/com/android/camera/async/ObservableCombiner.java b/src/com/android/camera/async/ObservableCombiner.java
new file mode 100644 (file)
index 0000000..c7b94ea
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.camera.async;
+
+import com.android.camera.util.Callback;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Enables combining multiple {@link Observable}s together with a given
+ * function.
+ * <p>
+ * Callbacks added to the resulting observable are notified when any of the
+ * dependencies change.
+ */
+@ThreadSafe
+@ParametersAreNonnullByDefault
+final class ObservableCombiner<I, O> implements Observable<O> {
+    private final ImmutableList<Observable<I>> mInputs;
+    private final Function<List<I>, O> mFunction;
+
+    private final Object mLock;
+
+    @GuardedBy("mLock")
+    private final ConcurrentState<O> mListenerNotifier;
+
+    @GuardedBy("mLock")
+    private final List<SafeCloseable> mInputCallbackHandles;
+
+    @GuardedBy("mLock")
+    private int mNumRegisteredCallbacks;
+
+    /**
+     * The thread-safe callback to be registered with each input.
+     */
+    private final Updatable<I> mInputCallback = new Updatable<I>() {
+        public void update(I ignored) {
+            mListenerNotifier.update(get());
+        }
+    };
+
+    private ObservableCombiner(List<? extends Observable<I>> inputs,
+            Function<List<I>, O> function, O initialValue) {
+        mInputs = ImmutableList.copyOf(inputs);
+        mFunction = function;
+        mListenerNotifier = new ConcurrentState<>(initialValue);
+        mLock = new Object();
+        mInputCallbackHandles = new ArrayList<>();
+        mNumRegisteredCallbacks = 0;
+    }
+
+    /**
+     * Transforms a set of input observables with a function.
+     *
+     * @param inputs The input observables.
+     * @param function The function to apply to all of the inputs.
+     * @param <I> The type of all inputs values.
+     * @param <O> The type of the output values.
+     * @return An observable which will reflect the combination of all inputs
+     *         with the given function. Changes in the output value will result
+     *         in calls to any callbacks registered with the output.
+     */
+    static <I, O> Observable<O> transform(List<? extends Observable<I>> inputs,
+            Function<List<I>, O> function) {
+        // Compute the initial value.
+        ArrayList<I> deps = new ArrayList<>();
+        for (Observable<? extends I> input : inputs) {
+            deps.add(input.get());
+        }
+        O initialValue = function.apply(deps);
+
+        return new ObservableCombiner<>(inputs, function, initialValue);
+    }
+
+    @GuardedBy("mLock")
+    private void addCallbacksToInputs() {
+        for (Observable<I> observable : mInputs) {
+            final SafeCloseable callbackHandle =
+                    Observables.addThreadSafeCallback(observable, mInputCallback);
+
+            mInputCallbackHandles.add(callbackHandle);
+        }
+    }
+
+    @GuardedBy("mLock")
+    private void removeCallbacksFromInputs() {
+        for (SafeCloseable callbackHandle : mInputCallbackHandles) {
+            callbackHandle.close();
+        }
+    }
+
+    @Nonnull
+    @Override
+    @CheckReturnValue
+    public SafeCloseable addCallback(final Callback<O> callback, Executor executor) {
+        // When a callback is added to this, the "output", we must ensure that
+        // callbacks are registered with all of the inputs so that they can be
+        // forwarded properly.
+        // Instead of adding another callback to each input for each callback
+        // registered with the output, callbacks are registered when the first
+        // output callback is added, and removed when the last output callback
+        // is removed.
+
+        synchronized (mLock) {
+            if (mNumRegisteredCallbacks == 0) {
+                addCallbacksToInputs();
+            }
+            mNumRegisteredCallbacks++;
+        }
+
+        // Wrap the callback in a {@link FilteredCallback} to prevent many
+        // duplicate/cascading updates even if the output does not change.
+        final SafeCloseable resultingCallbackHandle = mListenerNotifier.addCallback(
+                new FilteredCallback<O>(callback), executor);
+
+        return new SafeCloseable() {
+            @Override
+            public void close() {
+                resultingCallbackHandle.close();
+
+                synchronized (mLock) {
+                    mNumRegisteredCallbacks--;
+                    if (mNumRegisteredCallbacks == 0) {
+                        removeCallbacksFromInputs();
+                    }
+                }
+            }
+        };
+    }
+
+    @Nonnull
+    @Override
+    public O get() {
+        ArrayList<I> deps = new ArrayList<>();
+        for (Observable<? extends I> dependency : mInputs) {
+            deps.add(dependency.get());
+        }
+        return mFunction.apply(deps);
+    }
+}
index 57f7594..900e02b 100644 (file)
@@ -20,16 +20,26 @@ import com.android.camera.util.Callback;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import java.util.List;
 import java.util.concurrent.Executor;
 
 import javax.annotation.CheckReturnValue;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
 
 /**
  * Helper methods for {@link Observable}.
  */
+@ParametersAreNonnullByDefault
 public class Observables {
+    private static final SafeCloseable NOOP_CALLBACK_HANDLE = new SafeCloseable() {
+        @Override
+        public void close() {
+            // Do Nothing.
+        }
+    };
+
     private Observables() {
     }
 
@@ -41,13 +51,17 @@ public class Observables {
     public static <F, T> Observable<T> transform(final Observable<F> input,
             final Function<F, T> function) {
         return new Observable<T>() {
+            @Nonnull
             @Override
             public T get() {
                 return function.apply(input.get());
             }
 
+            @CheckReturnValue
+            @Nonnull
             @Override
-            public SafeCloseable addCallback(final Callback<T> callback, Executor executor) {
+            public SafeCloseable addCallback(final Callback<T> callback,
+                    Executor executor) {
                 return input.addCallback(new Callback<F>() {
                     @Override
                     public void onCallback(F result) {
@@ -59,32 +73,40 @@ public class Observables {
     }
 
     /**
+     * Transforms a set of observables with a function.
+     *
+     * @return The transformed observable.
+     */
+    public static <F, T> Observable<T> transform(final List<? extends Observable<F>> input,
+            Function<List<F>, T> function) {
+        return ObservableCombiner.transform(input, function);
+    }
+
+    /**
      * @return An observable which has the given constant value.
      */
     @Nonnull
     public static <T> Observable<T> of(final @Nullable T constant) {
         return new Observable<T>() {
+            @Nonnull
             @Override
             public T get() {
                 return constant;
             }
 
+            @CheckReturnValue
+            @Nonnull
             @Override
             public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
-                return new SafeCloseable() {
-                    @Override
-                    public void close() {
-                    }
-                };
+                return NOOP_CALLBACK_HANDLE;
             }
         };
     }
 
     @Nonnull
     @CheckReturnValue
-    public static <T> SafeCloseable addThreadSafeCallback(
-            @Nonnull Observable<T> observable,
-            final @Nonnull Updatable<T> callback) {
+    public static <T> SafeCloseable addThreadSafeCallback(Observable<T> observable,
+            final Updatable<T> callback) {
         return observable.addCallback(new Callback<T>() {
             @Override
             public void onCallback(T result) {
similarity index 69%
rename from src/com/android/camera/async/NoOp.java
rename to src/com/android/camera/async/Updatables.java
index 60db1fe..5ac07d6 100644 (file)
@@ -19,19 +19,20 @@ package com.android.camera.async;
 import javax.annotation.Nonnull;
 
 /**
- * This provides empty, NoOp implementation of generic updateable
- * objects.
+ * This provides empty, NoOp implementation of generic updateable objects.
  */
-public final class NoOp<T> implements Updatable<T> {
-    private static final NoOp<Object> INSTANCE = new NoOp<>();
+public final class Updatables {
+    private static final Updatable NOOP = new Updatable() {
+        @Override
+        public void update(@Nonnull Object o) {
+            // Do nothing.
+        }
+    };
 
-    @SuppressWarnings("unchecked")
-    public static <T> Updatable<T> ofUpdateable() {
-        return (Updatable<T>) INSTANCE;
+    private Updatables() {
     }
 
-    private NoOp() { }
-
-    @Override
-    public void update(@Nonnull T t) { }
-}
\ No newline at end of file
+    public static <T> Updatable<T> getNoOp() {
+        return NOOP;
+    }
+}
index 99c4f0c..6b403ae 100644 (file)
@@ -18,6 +18,7 @@ package com.android.camera.one.v2.sharedimagereader;
 
 import com.android.camera.async.HandlerFactory;
 import com.android.camera.async.Lifetime;
+import com.android.camera.async.Observable;
 import com.android.camera.async.Updatable;
 import com.android.camera.one.v2.camera2proxy.ImageReaderProxy;
 import com.android.camera.one.v2.core.RequestBuilder;
@@ -35,12 +36,13 @@ import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
  * <p>
  * Add the OnImageAvailableListener to the image reader in a separate thread.
  * <p>
- * Use the ImageQueueCaptureStreamFactory to create image streams to add to
+ * Use the {@link ImageStreamFactory} to create image streams to add to
  * {@link RequestBuilder}s to interact with the camera and ImageReader.
  */
 public class SharedImageReaderFactory {
     private final Updatable<Long> mGlobalTimestampQueue;
     private final ImageStreamFactory mSharedImageReader;
+    private final Observable<Integer> mAvailableImageCount;
 
     /**
      * @param lifetime The lifetime of the SharedImageReader, and other
@@ -58,6 +60,7 @@ public class SharedImageReaderFactory {
         mGlobalTimestampQueue = imageDistributorFactory.provideGlobalTimestampCallback();
 
         TicketPool ticketPool = new FiniteTicketPool(imageReader.getMaxImages() - 2);
+        mAvailableImageCount = ticketPool.getAvailableTicketCount();
         mSharedImageReader = new ImageStreamFactory(
                 new Lifetime(lifetime), ticketPool, imageReader.getSurface(), imageDistributor);
     }
@@ -69,4 +72,8 @@ public class SharedImageReaderFactory {
     public ImageStreamFactory provideSharedImageReader() {
         return mSharedImageReader;
     }
+
+    public Observable<Integer> provideAvailableImageCount() {
+        return mAvailableImageCount;
+    }
 }
index 1c506e4..1dbff93 100644 (file)
@@ -22,6 +22,7 @@ import static com.android.camera.one.v2.core.ResponseListeners.forTimestamps;
 import com.android.camera.async.BufferQueue;
 import com.android.camera.async.HandlerFactory;
 import com.android.camera.async.Lifetime;
+import com.android.camera.async.Observable;
 import com.android.camera.async.Updatable;
 import com.android.camera.one.v2.camera2proxy.ImageProxy;
 import com.android.camera.one.v2.camera2proxy.ImageReaderProxy;
@@ -36,6 +37,9 @@ import com.android.camera.one.v2.sharedimagereader.ringbuffer.DynamicRingBufferF
 import com.android.camera.one.v2.sharedimagereader.ticketpool.FiniteTicketPool;
 import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
 
+import static com.android.camera.one.v2.core.ResponseListeners.forFinalMetadata;
+import static com.android.camera.one.v2.core.ResponseListeners.forTimestamps;
+
 /**
  * Like {@link SharedImageReaderFactory}, but provides a single
  * {@link ImageStream} with a dynamic capacity which changes depending on demand
@@ -46,6 +50,7 @@ public class ZslSharedImageReaderFactory {
     private final ImageStream mZslCaptureStream;
     private final MetadataPool mMetadataPool;
     private final RequestTemplate mRequestTemplate;
+    private final Observable<Integer> mAvailableImageCount;
 
     /**
      * @param lifetime The lifetime of the SharedImageReader, and other
@@ -84,6 +89,8 @@ public class ZslSharedImageReaderFactory {
                 new Lifetime(lifetime), ringBufferFactory.provideTicketPool(),
                 imageReader.getSurface(), imageDistributor);
 
+        mAvailableImageCount = ringBufferFactory.provideTicketPool().getAvailableTicketCount();
+
         mRequestTemplate = new RequestTemplate(rootRequestTemplate);
         mRequestTemplate.addStream(mZslCaptureStream);
         mRequestTemplate.addResponseListener(forTimestamps(globalTimestampQueue));
@@ -106,4 +113,8 @@ public class ZslSharedImageReaderFactory {
     public RequestBuilder.Factory provideRequestTemplate() {
         return mRequestTemplate;
     }
+
+    public Observable<Integer> provideAvailableImageCount() {
+        return mAvailableImageCount;
+    }
 }
index 628f1e6..8c9662c 100644 (file)
 
 package com.android.camera.one.v2.sharedimagereader.ringbuffer;
 
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.android.camera.async.Observable;
+import com.android.camera.async.Observables;
 import com.android.camera.one.v2.sharedimagereader.ticketpool.Ticket;
 import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
 import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketProvider;
-import com.google.common.base.Supplier;
+import com.google.common.base.Function;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
  * Splits a {@link TicketPool} into a high-priority lane and a low-priority,
@@ -45,6 +53,9 @@ class TicketPoolPrioritizer {
         private LowPriorityTicketPool() {
         }
 
+        @Nullable
+        @CheckReturnValue
+        @Override
         public Ticket tryAcquire() {
             if (mHighPriorityWaiters.get() > 0) {
                 return null;
@@ -55,6 +66,7 @@ class TicketPoolPrioritizer {
     }
 
     private class HighPriorityTicketPool implements TicketPool {
+        @Nonnull
         @Override
         public Collection<Ticket> acquire(int tickets) throws InterruptedException,
                 NoCapacityAvailableException {
@@ -74,21 +86,14 @@ class TicketPoolPrioritizer {
             return result;
         }
 
+        @Nonnull
         @Override
-        public boolean canAcquire(int numTickets) {
-            if (numTickets < 0) {
-                return false;
-            }
-            // The number of additional tickets which must be requested from the
-            // parent pool to fulfill the hypothetical request for numTickets
-            // tickets, after flushing low-priority tickets.
-            int numAdditionalTickets = numTickets - mReleasableLowPriorityTicketCount.get();
-            if (numAdditionalTickets <= 0) {
-                return true;
-            }
-            return mRootTicketPool.canAcquire(numAdditionalTickets);
+        public Observable<Integer> getAvailableTicketCount() {
+            return mAvailableHighPriorityTicketCount;
         }
 
+        @Nullable
+        @CheckReturnValue
         @Override
         public Ticket tryAcquire() {
             // This over-aggressively flushes all low-priority tickets every
@@ -102,20 +107,38 @@ class TicketPoolPrioritizer {
     }
 
     private final LowPriorityTicketReleaser mLowPriorityTicketReleaser;
-    private final Supplier<Integer> mReleasableLowPriorityTicketCount;
     private final TicketPool mRootTicketPool;
     private final AtomicInteger mHighPriorityWaiters;
     private final HighPriorityTicketPool mHighPriority;
     private final LowPriorityTicketPool mLowPriority;
 
+    private final Observable<Integer> mAvailableHighPriorityTicketCount;
+
     public TicketPoolPrioritizer(LowPriorityTicketReleaser lowPriorityTicketReleaser,
-            Supplier<Integer> releasableLowPriorityTicketCount, TicketPool rootTicketPool) {
+            Observable<Integer> releasableLowPriorityTicketCount, TicketPool rootTicketPool) {
         mLowPriorityTicketReleaser = lowPriorityTicketReleaser;
-        mReleasableLowPriorityTicketCount = releasableLowPriorityTicketCount;
         mRootTicketPool = rootTicketPool;
         mHighPriorityWaiters = new AtomicInteger(0);
         mHighPriority = new HighPriorityTicketPool();
         mLowPriority = new LowPriorityTicketPool();
+
+        final Observable<Integer> parentPoolTicketCount = rootTicketPool.getAvailableTicketCount();
+        Function<List<Integer>, Integer> sum = new Function<List<Integer>, Integer>() {
+            @Override
+            public Integer apply(@Nullable List<Integer> integers) {
+                int total = 0;
+                for (Integer i : integers) {
+                    total += i;
+                }
+                return total;
+            }
+        };
+
+        // The number of high-priority tickets available at any time is the sum
+        // of parentPoolTicketCount and releaseableLowPriorityTicketCount.
+        mAvailableHighPriorityTicketCount = Observables.transform(
+                Arrays.asList(parentPoolTicketCount, releasableLowPriorityTicketCount),
+                sum);
     }
 
     public TicketProvider getLowPriorityTicketProvider() {
index 2ea9873..d8ab53c 100644 (file)
 
 package com.android.camera.one.v2.sharedimagereader.ticketpool;
 
+import com.android.camera.async.ConcurrentState;
+import com.android.camera.async.Observable;
+import com.android.camera.async.SafeCloseable;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
-import com.android.camera.async.SafeCloseable;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A ticket pool with a finite number of tickets.
  */
-public class FiniteTicketPool implements TicketPool, SafeCloseable {
+public final class FiniteTicketPool implements TicketPool, SafeCloseable {
     private class TicketImpl implements Ticket {
         private final AtomicBoolean mClosed;
 
@@ -39,91 +46,159 @@ public class FiniteTicketPool implements TicketPool, SafeCloseable {
         public void close() {
             boolean alreadyClosed = mClosed.getAndSet(true);
             if (!alreadyClosed) {
-                mTickets.release();
+                synchronized (mLock) {
+                    releaseTicket();
+                    updateAvailableTicketCount();
+                }
             }
         }
     }
 
+    private class Waiter {
+        private final int mTicketsRequested;
+        private final Condition mCondition;
+
+        private Waiter(int ticketsRequested, Condition condition) {
+            mTicketsRequested = ticketsRequested;
+            mCondition = condition;
+        }
+
+        public Condition getCondition() {
+            return mCondition;
+        }
+
+        public int getTicketsRequested() {
+            return mTicketsRequested;
+        }
+    }
+
     private final int mMaxCapacity;
-    private final Semaphore mTickets;
-    private final AtomicBoolean mClosed;
+    private final ReentrantLock mLock;
+    @GuardedBy("mLock")
+    private final LinkedList<Waiter> mWaiters;
+    private final ConcurrentState<Integer> mAvailableTicketCount;
+    @GuardedBy("mLock")
+    private int mTickets;
+    @GuardedBy("mLock")
+    private boolean mClosed;
 
     public FiniteTicketPool(int capacity) {
         mMaxCapacity = capacity;
-        mTickets = new Semaphore(capacity);
-        mClosed = new AtomicBoolean(false);
+        mLock = new ReentrantLock(true);
+        mTickets = capacity;
+        mWaiters = new LinkedList<>();
+        mClosed = false;
+        mAvailableTicketCount = new ConcurrentState<>(capacity);
+    }
+
+    @GuardedBy("mLock")
+    private void releaseTicket() {
+        mLock.lock();
+        try {
+            mTickets++;
+
+            // Wake up waiters in order, so long as their requested number of
+            // tickets can be satisfied.
+            int ticketsRemaining = mTickets;
+            Waiter nextWaiter = mWaiters.peekFirst();
+            while (nextWaiter != null && nextWaiter.getTicketsRequested() <= ticketsRemaining) {
+                ticketsRemaining -= nextWaiter.getTicketsRequested();
+                nextWaiter.getCondition().signal();
+
+                mWaiters.removeFirst();
+                nextWaiter = mWaiters.peekFirst();
+            }
+        } finally {
+            mLock.unlock();
+        }
     }
 
+    @Nonnull
     @Override
     public Collection<Ticket> acquire(int tickets) throws InterruptedException,
             NoCapacityAvailableException {
-        if (tickets > mMaxCapacity || tickets < 0) {
-            throw new NoCapacityAvailableException();
-        }
-        mTickets.acquire(tickets);
-        if (mClosed.get()) {
-            // If the pool was closed while we were waiting to acquire the
-            // tickets, release them (they may be fake) and throw because no
-            // capacity is available.
-            mTickets.release(tickets);
-            throw new NoCapacityAvailableException();
+        mLock.lock();
+        try {
+            if (tickets > mMaxCapacity || tickets < 0) {
+                throw new NoCapacityAvailableException();
+            }
+            Waiter thisWaiter = new Waiter(tickets, mLock.newCondition());
+            mWaiters.addLast(thisWaiter);
+            updateAvailableTicketCount();
+            try {
+                while (mTickets < tickets && !mClosed) {
+                    thisWaiter.getCondition().await();
+                }
+                if (mClosed) {
+                    throw new NoCapacityAvailableException();
+                }
+
+                mTickets -= tickets;
+
+                updateAvailableTicketCount();
+
+                List<Ticket> ticketList = new ArrayList<>();
+                for (int i = 0; i < tickets; i++) {
+                    ticketList.add(new TicketImpl());
+                }
+                return ticketList;
+            } finally {
+                mWaiters.remove(thisWaiter);
+                updateAvailableTicketCount();
+            }
+        } finally {
+            mLock.unlock();
         }
-        List<Ticket> ticketList = new ArrayList<Ticket>();
-        for (int i = 0; i < tickets; i++) {
-            ticketList.add(new TicketImpl());
+    }
+
+    @GuardedBy("mLock")
+    private void updateAvailableTicketCount() {
+        if (mClosed || !mWaiters.isEmpty()) {
+            mAvailableTicketCount.update(0);
+        } else {
+            mAvailableTicketCount.update(mTickets);
         }
-        return ticketList;
     }
 
+    @Nonnull
     @Override
-    public boolean canAcquire(int tickets) {
-        if (tickets < 0) {
-            return false;
-        }
-        if (tickets == 0) {
-            return true;
-        }
-        return !mClosed.get() &&
-                !mTickets.hasQueuedThreads() &&
-                mTickets.availablePermits() >= tickets;
+    public Observable<Integer> getAvailableTicketCount() {
+        return mAvailableTicketCount;
     }
 
     @Override
     public Ticket tryAcquire() {
-        if (mTickets.tryAcquire()) {
-            // Release the ticket immediately if...
-            // 1. The pool is closed, and tryAcquire may have received a fake,
-            // "poisson pill" ticket, which must be released in order to enable
-            // other calls to acquire() to not block.
-            // 2. Or, mTickets has threads queued on it because of blocked calls
-            // to {@link #acquire}. It would not be fair to acquire this ticket
-            // if there are pending requests for it already.
-            boolean releaseTicketImmediately = mClosed.get() || mTickets.hasQueuedThreads();
-            if (releaseTicketImmediately) {
-                mTickets.release();
-                return null;
-            } else {
+        mLock.lock();
+        try {
+            if (!mClosed && mWaiters.isEmpty() && mTickets >= 1) {
+                mTickets--;
+                updateAvailableTicketCount();
                 return new TicketImpl();
+            } else {
+                return null;
             }
-        } else {
-            return null;
+        } finally {
+            mLock.unlock();
         }
     }
 
     @Override
     public void close() {
-        if (mClosed.getAndSet(true)) {
-            // If already closed, just return.
-            return;
-        }
+        mLock.lock();
+        try {
+            if (mClosed) {
+                return;
+            }
 
-        // Threads may be waiting in acquire() for tickets to be available, so
-        // wake them up by adding fake, "poisson pill" tickets.
-        // Adding mMaxCapacity permits to the semaphore is sufficient to
-        // guarantee that any/all
-        // waiting threads wake up and detect that the pool is closed (so long
-        // as all waiting
-        // threads release the fake tickets they acquired while waking up).
-        mTickets.release(mMaxCapacity);
+            mClosed = true;
+
+            for (Waiter waiter : mWaiters) {
+                waiter.getCondition().signal();
+            }
+
+            updateAvailableTicketCount();
+        } finally {
+            mLock.unlock();
+        }
     }
 }
index 76932b1..850dad0 100644 (file)
 
 package com.android.camera.one.v2.sharedimagereader.ticketpool;
 
+import com.android.camera.async.ConcurrentState;
+import com.android.camera.async.Observable;
+import com.android.camera.async.SafeCloseable;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,7 +28,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.android.camera.async.SafeCloseable;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A TicketPool which reserves a capacity of tickets ahead of time from its
@@ -39,15 +47,24 @@ import com.android.camera.async.SafeCloseable;
  * as soon as possible, which may depend on consumers of this ticket pool
  * closing tickets which had previously been acquired.
  */
+@ParametersAreNonnullByDefault
 public class ReservableTicketPool implements TicketPool, SafeCloseable {
     private static class Waiter {
-        public final Condition mDoneCondition;
-        public final int mRequestedTicketCount;
+        private final Condition mDoneCondition;
+        private final int mRequestedTicketCount;
 
         private Waiter(Condition doneCondition, int requestedTicketCount) {
             mDoneCondition = doneCondition;
             mRequestedTicketCount = requestedTicketCount;
         }
+
+        public Condition getDoneCondition() {
+            return mDoneCondition;
+        }
+
+        public int getRequestedTicketCount() {
+            return mRequestedTicketCount;
+        }
     }
 
     /**
@@ -79,6 +96,7 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
                 releaseToParent = (mParentTickets.size() == mCapacity);
                 if (!releaseToParent) {
                     mParentTickets.add(mParentTicket);
+                    updateCurrentTicketCount();
                     releaseWaitersOnTicketAvailability();
                 }
             } finally {
@@ -104,18 +122,26 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
     private final ReentrantLock mLock;
     /**
      * A Queue containing the number of tickets requested by each thread
-     * currently blocked on {@link #mTicketWaiters} in {@link #acquire}.
+     * currently blocked in {@link #acquire}.
      */
+    @GuardedBy("mLock")
     private final ArrayDeque<Waiter> mTicketWaiters;
     /**
      * Tickets from mParentPool which have not been given to clients via
      * {@link #acquire}.
      */
+    @GuardedBy("mLock")
     private final ArrayDeque<Ticket> mParentTickets;
     /**
+     * Maintains an observable count of the number of tickets which are readily
+     * available at any time.
+     */
+    private final ConcurrentState<Integer> mAvailableTicketCount;
+    /**
      * The total number of tickets available and outstanding (acquired but not
      * closed).
      */
+    @GuardedBy("mLock")
     private int mCapacity;
 
     public ReservableTicketPool(TicketPool parentPool) {
@@ -124,8 +150,24 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
         mTicketWaiters = new ArrayDeque<>();
         mParentTickets = new ArrayDeque<>();
         mCapacity = 0;
+        mAvailableTicketCount = new ConcurrentState<>(0);
+    }
+
+    @GuardedBy("mLock")
+    private void updateCurrentTicketCount() {
+        mLock.lock();
+        try {
+            if (mTicketWaiters.size() != 0) {
+                mAvailableTicketCount.update(0);
+            } else {
+                mAvailableTicketCount.update(mParentTickets.size());
+            }
+        } finally {
+            mLock.unlock();
+        }
     }
 
+    @Nonnull
     @Override
     public Collection<Ticket> acquire(int tickets) throws InterruptedException,
             NoCapacityAvailableException {
@@ -138,30 +180,10 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
         return wrappedTicketList;
     }
 
+    @Nonnull
     @Override
-    public boolean canAcquire(int tickets) {
-        if (tickets == 0) {
-            return true;
-        }
-        if (tickets < 0) {
-            return false;
-        }
-        boolean canAcquire = false;
-        mLock.lock();
-        try {
-            // Return true if no threads are waiting to acquire tickets and
-            // there are enough tickets in the pool to satisfy such a request.
-            if (mTicketWaiters.size() == 0 &&
-                    mParentTickets.size() >= tickets) {
-                canAcquire = true;
-            } else {
-                canAcquire = false;
-            }
-        } finally {
-            mLock.unlock();
-        }
-
-        return canAcquire;
+    public Observable<Integer> getAvailableTicketCount() {
+        return mAvailableTicketCount;
     }
 
     @Override
@@ -173,6 +195,7 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
                 return null;
             }
             parentTicket = mParentTickets.remove();
+            updateCurrentTicketCount();
         } finally {
             mLock.unlock();
         }
@@ -203,6 +226,8 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
         } finally {
             mLock.unlock();
         }
+
+        updateCurrentTicketCount();
     }
 
     /**
@@ -241,6 +266,8 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
         for (Ticket ticket : parentTicketsToRelease) {
             ticket.close();
         }
+
+        updateCurrentTicketCount();
     }
 
     /**
@@ -269,7 +296,8 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
     private Collection<Ticket> acquireParentTickets(int tickets) throws InterruptedException,
             NoCapacityAvailableException {
         // The list of tickets from mTicketList to acquire.
-        // Try to acquire these immediately, if there are no threads already waiting for tickets.
+        // Try to acquire these immediately, if there are no threads already
+        // waiting for tickets.
         List<Ticket> acquiredParentTickets = null;
         mLock.lock();
         try {
@@ -278,14 +306,16 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
             }
             Waiter thisWaiter = new Waiter(mLock.newCondition(), tickets);
             mTicketWaiters.add(thisWaiter);
+            updateCurrentTicketCount();
             try {
                 while (acquiredParentTickets == null) {
-                    thisWaiter.mDoneCondition.await();
+                    thisWaiter.getDoneCondition().await();
                     acquiredParentTickets = tryAcquireAtomically(tickets);
                 }
             } finally {
                 mTicketWaiters.remove(thisWaiter);
             }
+            updateCurrentTicketCount();
         } finally {
             mLock.unlock();
         }
@@ -300,6 +330,8 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
      * @return The tickets acquired from the parent ticket pool, or null if they
      *         could not be acquired.
      */
+    @Nullable
+    @CheckReturnValue
     private List<Ticket> tryAcquireAtomically(int tickets) throws NoCapacityAvailableException {
         List<Ticket> acquiredParentTickets = new ArrayList<>();
         mLock.lock();
@@ -311,6 +343,7 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
                 for (int i = 0; i < tickets; i++) {
                     acquiredParentTickets.add(mParentTickets.remove());
                 }
+                updateCurrentTicketCount();
                 return acquiredParentTickets;
             }
         } finally {
@@ -327,9 +360,9 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
             int numTicketsReadilyAvailable = mParentTickets.size();
             while (mTicketWaiters.size() > 0) {
                 Waiter nextWaiter = mTicketWaiters.peek();
-                if (nextWaiter.mRequestedTicketCount <= numTicketsReadilyAvailable) {
-                    numTicketsReadilyAvailable -= nextWaiter.mRequestedTicketCount;
-                    nextWaiter.mDoneCondition.signal();
+                if (nextWaiter.getRequestedTicketCount() <= numTicketsReadilyAvailable) {
+                    numTicketsReadilyAvailable -= nextWaiter.getRequestedTicketCount();
+                    nextWaiter.getDoneCondition().signal();
                     mTicketWaiters.poll();
                 } else {
                     return;
@@ -345,14 +378,14 @@ public class ReservableTicketPool implements TicketPool, SafeCloseable {
         try {
             // Release all waiters requesting more tickets than the current
             // capacity
-            List<Waiter> toRemove = new ArrayList<Waiter>();
+            List<Waiter> toRemove = new ArrayList<>();
             for (Waiter waiter : mTicketWaiters) {
-                if (waiter.mRequestedTicketCount > mCapacity) {
+                if (waiter.getRequestedTicketCount() > mCapacity) {
                     toRemove.add(waiter);
                 }
             }
             for (Waiter waiter : toRemove) {
-                waiter.mDoneCondition.signal();
+                waiter.getDoneCondition().signal();
             }
         } finally {
             mLock.unlock();
index 2cbcac4..46d1231 100644 (file)
 
 package com.android.camera.one.v2.sharedimagereader.ticketpool;
 
+import com.android.camera.async.Observable;
+
 import java.util.Collection;
 
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * Stores a collection of {@link Ticket}s. Tickets may be acquired from the
  * pool. When closed, tickets return themselves to the pool.
@@ -32,31 +38,32 @@ public interface TicketPool extends TicketProvider {
     }
 
     /**
-     * Acquires and returns the specified number of tickets.
+     * Acquires and returns the specified number of tickets. The caller owns all
+     * returned tickets and is responsible for eventually closing them.
      * <p>
-     * Note: Implementations MUST be fair!
-     * </p>
+     * Implementations must be fair w.r.t. other calls to acquire.
      */
+    @Nonnull
     public Collection<Ticket> acquire(int tickets) throws InterruptedException,
             NoCapacityAvailableException;
 
     /**
-     * Provides a hint as to whether or not the specified number of tickets can
-     * be acquired immediately, or will probably block.
-     *
-     * @return True if a subsequent call to {@link #acquire} will probably not
-     *         block.
+     * @return The number of tickets readily-available for immediate
+     *         acquisition, as an observable object.
      */
-    public boolean canAcquire(int tickets);
+    @Nonnull
+    public Observable<Integer> getAvailableTicketCount();
 
     /**
-     * Attempts to acquire and return a ticket.
+     * Attempts to acquire and return a ticket. The caller owns the resulting
+     * ticket (if not null) and is responsible for eventually closing it.
      * <p>
-     * Note: Implementations MUST be fair w.r.t. {@link #acquire}.
-     * </p>
+     * Implementations must be fair w.r.t. {@link #acquire}.
      *
      * @return The acquired ticket, or null if no ticket is readily available.
      */
     @Override
+    @Nullable
+    @CheckReturnValue
     public Ticket tryAcquire();
 }
index 1992dfa..37ebf5f 100644 (file)
 
 package com.android.camera.one.v2.sharedimagereader.ticketpool;
 
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
+
 public interface TicketProvider {
     /**
      * Attempts to acquire and return a ticket.
      *
      * @return The acquired ticket, or null if no ticket is readily available.
      */
+    @Nullable
+    @CheckReturnValue
     public Ticket tryAcquire();
 }
index c52d5cf..4e3c5c9 100644 (file)
@@ -16,7 +16,7 @@
 
 package com.android.camera.one.v2.stats;
 
-import com.android.camera.async.NoOp;
+import com.android.camera.async.Updatables;
 import com.android.camera.async.Updatable;
 import com.android.camera.debug.Log;
 import com.android.camera.debug.Log.Tag;
@@ -38,7 +38,7 @@ public class PreviewFpsListener extends ResponseListener {
     private double mFpsValue = 30.0f;
 
     public PreviewFpsListener() {
-        this(NoOp.<Float>ofUpdateable());
+        this(Updatables.<Float>getNoOp());
     }
 
     public PreviewFpsListener(Updatable<Float> fpsListener) {
index a7d9a30..a38f3af 100644 (file)
 
 package com.android.camera.settings;
 
+import com.android.camera.async.ExecutorCallback;
 import com.android.camera.async.FilteredUpdatable;
 import com.android.camera.async.Observable;
-import com.android.camera.async.ExecutorCallback;
 import com.android.camera.async.SafeCloseable;
 import com.android.camera.async.Updatable;
 import com.android.camera.util.Callback;
 
 import java.util.concurrent.Executor;
 
-import javax.annotation.Nullable;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 
 /**
@@ -35,9 +36,9 @@ import javax.annotation.concurrent.ThreadSafe;
 @ThreadSafe
 public final class SettingObserver<T> implements Observable<T> {
     private class Listener implements SettingsManager.OnSettingChangedListener, SafeCloseable {
-        private final Updatable<T> mCallback;
+        private final Updatable<? super T> mCallback;
 
-        private Listener(Updatable<T> callback) {
+        private Listener(Updatable<? super T> callback) {
             mCallback = callback;
         }
 
@@ -85,15 +86,17 @@ public final class SettingObserver<T> implements Observable<T> {
                 Boolean.class);
     }
 
+    @CheckReturnValue
+    @Nonnull
     @Override
-    public SafeCloseable addCallback(Callback<T> callback, Executor executor) {
+    public SafeCloseable addCallback(@Nonnull Callback<T> callback, @Nonnull Executor executor) {
         final Listener listener =
                 new Listener(new FilteredUpdatable<>(new ExecutorCallback<>(callback, executor)));
         mSettingsManager.addListener(listener);
         return listener;
     }
 
-    @Nullable
+    @Nonnull
     @Override
     @SuppressWarnings("unchecked")
     public T get() {
index 429789e..601a689 100644 (file)
 
 package com.android.camera.util;
 
+import javax.annotation.Nonnull;
+
 /**
  * Simple callback.
  * TODO: Move to async.
  */
 public interface Callback<T> {
-    public void onCallback(T result);
+    public void onCallback(@Nonnull T result);
 }