OSDN Git Service

DO NOT MERGE IP Connectivity metrics: add connect() statistics
authorHugo Benichi <hugobenichi@google.com>
Fri, 25 Nov 2016 02:24:22 +0000 (11:24 +0900)
committerHugo Benichi <hugobenichi@google.com>
Thu, 15 Dec 2016 13:56:36 +0000 (22:56 +0900)
This patch adds a ConnectStats class to aggregate connect() statistics
gathered in NetdEventListenerService. ConnectStats is uploaded once a
day by IpConnectivityMetrics.

Test: $ runtest frameworks-net + new unit test
Bug: 32198976

(cherry picked from commit 4f71a15973f7048076ed10f63d708f020e1a21e0)

Change-Id: I1450c126b90b628d9f1c73ddf054b4742f5e84c5

core/java/com/android/internal/util/TokenBucket.java
services/core/java/com/android/server/connectivity/ConnectStats.java [new file with mode: 0644]
services/core/java/com/android/server/connectivity/IpConnectivityEventBuilder.java
services/core/java/com/android/server/connectivity/IpConnectivityMetrics.java
services/core/java/com/android/server/connectivity/NetdEventListenerService.java
services/tests/servicestests/src/com/android/server/connectivity/IpConnectivityEventBuilderTest.java
services/tests/servicestests/src/com/android/server/connectivity/IpConnectivityMetricsTest.java
services/tests/servicestests/src/com/android/server/connectivity/NetdEventListenerServiceTest.java

index effb82b..a163ceb 100644 (file)
@@ -33,6 +33,8 @@ import static com.android.internal.util.Preconditions.checkArgumentPositive;
  * The available amount of tokens is computed lazily when the bucket state is inspected.
  * Therefore it is purely synchronous and does not involve any asynchronous activity.
  * It is not synchronized in any way and not a thread-safe object.
+ *
+ * {@hide}
  */
 public class TokenBucket {
 
diff --git a/services/core/java/com/android/server/connectivity/ConnectStats.java b/services/core/java/com/android/server/connectivity/ConnectStats.java
new file mode 100644 (file)
index 0000000..d6de815
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2016 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.connectivity;
+
+import android.system.OsConstants;
+import android.util.IntArray;
+import android.util.SparseIntArray;
+import com.android.internal.util.TokenBucket;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.ConnectStatistics;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.Pair;
+
+/**
+ * A class that aggregates connect() statistics and helps build
+ * IpConnectivityLogClass.ConnectStatistics instances.
+ *
+ * {@hide}
+ */
+public class ConnectStats {
+    private final static int EALREADY     = OsConstants.EALREADY;
+    private final static int EINPROGRESS  = OsConstants.EINPROGRESS;
+
+    /** How many events resulted in a given errno. */
+    private final SparseIntArray mErrnos = new SparseIntArray();
+    /** Latencies of blocking connects. TODO: add non-blocking connects latencies. */
+    private final IntArray mLatencies = new IntArray();
+    /** TokenBucket for rate limiting latency recording. */
+    private final TokenBucket mLatencyTb;
+    /** Maximum number of latency values recorded. */
+    private final int mMaxLatencyRecords;
+    /** Total count of successful connects. */
+    private int mConnectCount = 0;
+    /** Total count of successful connects with IPv6 socket address. */
+    private int mIpv6ConnectCount = 0;
+
+    public ConnectStats(TokenBucket tb, int maxLatencyRecords) {
+        mLatencyTb = tb;
+        mMaxLatencyRecords = maxLatencyRecords;
+    }
+
+    public ConnectStatistics toProto() {
+        ConnectStatistics stats = new ConnectStatistics();
+        stats.connectCount = mConnectCount;
+        stats.ipv6AddrCount = mIpv6ConnectCount;
+        stats.latenciesMs = mLatencies.toArray();
+        stats.errnosCounters = toPairArrays(mErrnos);
+        return stats;
+    }
+
+    public void addEvent(int errno, int latencyMs, String ipAddr) {
+        if (isSuccess(errno)) {
+            countConnect(ipAddr);
+            countLatency(errno, latencyMs);
+        } else {
+            countError(errno);
+        }
+    }
+
+    private void countConnect(String ipAddr) {
+        mConnectCount++;
+        if (isIPv6(ipAddr)) mIpv6ConnectCount++;
+    }
+
+    private void countLatency(int errno, int ms) {
+        if (isNonBlocking(errno)) {
+            // Ignore connect() on non-blocking sockets
+            return;
+        }
+        if (!mLatencyTb.get()) {
+            // Rate limited
+            return;
+        }
+        if (mLatencies.size() >= mMaxLatencyRecords) {
+            // Hard limit the total number of latency measurements.
+            return;
+        }
+        mLatencies.add(ms);
+    }
+
+    private void countError(int errno) {
+        final int newcount = mErrnos.get(errno, 0) + 1;
+        mErrnos.put(errno, newcount);
+    }
+
+    private static boolean isSuccess(int errno) {
+        return (errno == 0) || isNonBlocking(errno);
+    }
+
+    private static boolean isNonBlocking(int errno) {
+        // On non-blocking TCP sockets, connect() immediately returns EINPROGRESS.
+        // On non-blocking TCP sockets that are connecting, connect() immediately returns EALREADY.
+        return (errno == EINPROGRESS) || (errno == EALREADY);
+    }
+
+    private static boolean isIPv6(String ipAddr) {
+        return ipAddr.contains(":");
+    }
+
+    private static Pair[] toPairArrays(SparseIntArray counts) {
+        final int s = counts.size();
+        Pair[] pairs = new Pair[s];
+        for (int i = 0; i < s; i++) {
+            Pair p = new Pair();
+            p.key = counts.keyAt(i);
+            p.value = counts.valueAt(i);
+            pairs[i] = p;
+        }
+        return pairs;
+    }
+}
index f1d01e0..2a2d1ab 100644 (file)
@@ -43,10 +43,10 @@ final public class IpConnectivityEventBuilder {
     private IpConnectivityEventBuilder() {
     }
 
-    public static byte[] serialize(int dropped, List<ConnectivityMetricsEvent> events)
+    public static byte[] serialize(int dropped, List<IpConnectivityEvent> events)
             throws IOException {
         final IpConnectivityLog log = new IpConnectivityLog();
-        log.events = toProto(events);
+        log.events = events.toArray(new IpConnectivityEvent[events.size()]);
         log.droppedEvents = dropped;
         if ((log.events.length > 0) || (dropped > 0)) {
             // Only write version number if log has some information at all.
@@ -55,7 +55,7 @@ final public class IpConnectivityEventBuilder {
         return IpConnectivityLog.toByteArray(log);
     }
 
-    public static IpConnectivityEvent[] toProto(List<ConnectivityMetricsEvent> eventsIn) {
+    public static List<IpConnectivityEvent> toProto(List<ConnectivityMetricsEvent> eventsIn) {
         final ArrayList<IpConnectivityEvent> eventsOut = new ArrayList<>(eventsIn.size());
         for (ConnectivityMetricsEvent in : eventsIn) {
             final IpConnectivityEvent out = toProto(in);
@@ -64,7 +64,7 @@ final public class IpConnectivityEventBuilder {
             }
             eventsOut.add(out);
         }
-        return eventsOut.toArray(new IpConnectivityEvent[eventsOut.size()]);
+        return eventsOut;
     }
 
     public static IpConnectivityEvent toProto(ConnectivityMetricsEvent ev) {
index f485f56..76c3528 100644 (file)
@@ -36,14 +36,14 @@ import com.android.internal.annotations.GuardedBy;
 import com.android.internal.annotations.VisibleForTesting;
 import com.android.internal.util.TokenBucket;
 import com.android.server.SystemService;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.IpConnectivityEvent;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.function.ToIntFunction;
 
-import static com.android.server.connectivity.metrics.IpConnectivityLogClass.IpConnectivityEvent;
-
 /** {@hide} */
 final public class IpConnectivityMetrics extends SystemService {
     private static final String TAG = IpConnectivityMetrics.class.getSimpleName();
@@ -63,6 +63,8 @@ final public class IpConnectivityMetrics extends SystemService {
     // Maximum size of the event buffer.
     private static final int MAXIMUM_BUFFER_SIZE = DEFAULT_BUFFER_SIZE * 10;
 
+    private static final int MAXIMUM_CONNECT_LATENCY_RECORDS = 20000;
+
     private static final int ERROR_RATE_LIMITED = -1;
 
     // Lock ensuring that concurrent manipulations of the event buffer are correct.
@@ -160,9 +162,15 @@ final public class IpConnectivityMetrics extends SystemService {
             initBuffer();
         }
 
+        final List<IpConnectivityEvent> protoEvents = IpConnectivityEventBuilder.toProto(events);
+
+        if (mNetdListener != null) {
+            mNetdListener.flushStatistics(protoEvents);
+        }
+
         final byte[] data;
         try {
-            data = IpConnectivityEventBuilder.serialize(dropped, events);
+            data = IpConnectivityEventBuilder.serialize(dropped, protoEvents);
         } catch (IOException e) {
             Log.e(TAG, "could not serialize events", e);
             return "";
index 348f64c..3f056a5 100644 (file)
@@ -19,25 +19,27 @@ package com.android.server.connectivity;
 import android.content.Context;
 import android.net.ConnectivityManager;
 import android.net.ConnectivityManager.NetworkCallback;
-import android.net.Network;
 import android.net.INetdEventCallback;
+import android.net.Network;
 import android.net.NetworkRequest;
 import android.net.metrics.DnsEvent;
 import android.net.metrics.INetdEventListener;
 import android.net.metrics.IpConnectivityLog;
 import android.os.RemoteException;
+import android.text.format.DateUtils;
 import android.util.Log;
-
 import com.android.internal.annotations.GuardedBy;
 import com.android.internal.annotations.VisibleForTesting;
 import com.android.internal.util.IndentingPrintWriter;
-
+import com.android.internal.util.TokenBucket;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.ConnectStatistics;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.IpConnectivityEvent;
 import java.io.PrintWriter;
 import java.util.Arrays;
+import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-
 /**
  * Implementation of the INetdEventListener interface.
  */
@@ -52,6 +54,12 @@ public class NetdEventListenerService extends INetdEventListener.Stub {
     // TODO: read this constant from system property
     private static final int MAX_LOOKUPS_PER_DNS_EVENT = 100;
 
+    // Rate limit connect latency logging to 1 measurement per 15 seconds (5760 / day) with maximum
+    // bursts of 5000 measurements.
+    private static final int CONNECT_LATENCY_BURST_LIMIT  = 5000;
+    private static final int CONNECT_LATENCY_FILL_RATE    = 15 * (int) DateUtils.SECOND_IN_MILLIS;
+    private static final int CONNECT_LATENCY_MAXIMUM_RECORDS = 20000;
+
     // Stores the results of a number of consecutive DNS lookups on the same network.
     // This class is not thread-safe and it is the responsibility of the service to call its methods
     // on one thread at a time.
@@ -121,6 +129,12 @@ public class NetdEventListenerService extends INetdEventListener.Stub {
         }
     };
 
+    @GuardedBy("this")
+    private final TokenBucket mConnectTb =
+            new TokenBucket(CONNECT_LATENCY_FILL_RATE, CONNECT_LATENCY_BURST_LIMIT);
+    @GuardedBy("this")
+    private ConnectStats mConnectStats = makeConnectStats();
+
     // Callback should only be registered/unregistered when logging is being enabled/disabled in DPM
     // by the device owner. It's DevicePolicyManager's responsibility to ensure that.
     @GuardedBy("this")
@@ -175,13 +189,28 @@ public class NetdEventListenerService extends INetdEventListener.Stub {
     // This method must not block or perform long-running operations.
     public synchronized void onConnectEvent(int netId, int error, int latencyMs, String ipAddr, int port,
             int uid) throws RemoteException {
-        maybeVerboseLog("onConnectEvent(%d, %d, %dms)", netId, error, latencyMs);
+        maybeVerboseLog("onConnectEvent(%d, %d)", netId, latencyMs);
+
+        mConnectStats.addEvent(error, latencyMs, ipAddr);
 
         if (mNetdEventCallback != null) {
             mNetdEventCallback.onConnectEvent(ipAddr, port, System.currentTimeMillis(), uid);
         }
     }
 
+    public synchronized void flushStatistics(List<IpConnectivityEvent> events) {
+        events.add(flushConnectStats());
+        // TODO: migrate DnsEventBatch to IpConnectivityLogClass.DNSLatencies
+    }
+
+    private IpConnectivityEvent flushConnectStats() {
+        IpConnectivityEvent ev = new IpConnectivityEvent();
+        ev.connectStatistics = mConnectStats.toProto();
+        // TODO: add transport information
+        mConnectStats = makeConnectStats();
+        return ev;
+    }
+
     public synchronized void dump(PrintWriter writer) {
         IndentingPrintWriter pw = new IndentingPrintWriter(writer, "  ");
         pw.println(TAG + ":");
@@ -189,9 +218,14 @@ public class NetdEventListenerService extends INetdEventListener.Stub {
         for (DnsEventBatch batch : mEventBatches.values()) {
             pw.println(batch.toString());
         }
+        // TODO: also dump ConnectStats
         pw.decreaseIndent();
     }
 
+    private ConnectStats makeConnectStats() {
+        return new ConnectStats(mConnectTb, CONNECT_LATENCY_MAXIMUM_RECORDS);
+    }
+
     private static void maybeLog(String s, Object... args) {
         if (DBG) Log.d(TAG, String.format(s, args));
     }
index 5e4fdb6..011e505 100644 (file)
@@ -369,7 +369,8 @@ public class IpConnectivityEventBuilderTest extends TestCase {
 
     static void verifySerialization(String want, ConnectivityMetricsEvent... input) {
         try {
-            byte[] got = IpConnectivityEventBuilder.serialize(0, Arrays.asList(input));
+            byte[] got = IpConnectivityEventBuilder.serialize(0,
+                    IpConnectivityEventBuilder.toProto(Arrays.asList(input)));
             IpConnectivityLog log = IpConnectivityLog.parseFrom(got);
             assertEquals(want, log.toString());
         } catch (Exception e) {
index e056176..450653c 100644 (file)
@@ -292,10 +292,5 @@ public class IpConnectivityMetricsTest extends TestCase {
     }
 
     static final Comparator<ConnectivityMetricsEvent> EVENT_COMPARATOR =
-        new Comparator<ConnectivityMetricsEvent>() {
-            @Override
-            public int compare(ConnectivityMetricsEvent ev1, ConnectivityMetricsEvent ev2) {
-                return (int) (ev1.timestamp - ev2.timestamp);
-            }
-        };
+        Comparator.comparingLong((ev) -> ev.timestamp);
 }
index af4a374..97afa60 100644 (file)
 
 package com.android.server.connectivity;
 
-import android.net.ConnectivityManager.NetworkCallback;
 import android.net.ConnectivityManager;
+import android.net.ConnectivityManager.NetworkCallback;
 import android.net.Network;
 import android.net.metrics.DnsEvent;
 import android.net.metrics.INetdEventListener;
 import android.net.metrics.IpConnectivityLog;
 import android.os.RemoteException;
-
+import android.system.OsConstants;
+import android.test.suitebuilder.annotation.SmallTest;
+import com.android.server.connectivity.metrics.IpConnectivityLogClass.IpConnectivityEvent;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.stream.IntStream;
 import junit.framework.TestCase;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.eq;
@@ -41,13 +51,6 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import java.io.FileOutputStream;
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.OptionalInt;
-import java.util.stream.IntStream;
-
 public class NetdEventListenerServiceTest extends TestCase {
 
     // TODO: read from NetdEventListenerService after this constant is read from system property
@@ -67,50 +70,56 @@ public class NetdEventListenerServiceTest extends TestCase {
         }
     }
 
+    private static final String EXAMPLE_IPV4 = "192.0.2.1";
+    private static final String EXAMPLE_IPV6 = "2001:db8:1200::2:1";
+
     NetdEventListenerService mNetdEventListenerService;
 
     @Mock ConnectivityManager mCm;
     @Mock IpConnectivityLog mLog;
     ArgumentCaptor<NetworkCallback> mCallbackCaptor;
-    ArgumentCaptor<DnsEvent> mEvCaptor;
+    ArgumentCaptor<DnsEvent> mDnsEvCaptor;
 
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         mCallbackCaptor = ArgumentCaptor.forClass(NetworkCallback.class);
-        mEvCaptor = ArgumentCaptor.forClass(DnsEvent.class);
+        mDnsEvCaptor = ArgumentCaptor.forClass(DnsEvent.class);
         mNetdEventListenerService = new NetdEventListenerService(mCm, mLog);
 
         verify(mCm, times(1)).registerNetworkCallback(any(), mCallbackCaptor.capture());
     }
 
-    public void testOneBatch() throws Exception {
+    @SmallTest
+    public void testOneDnsBatch() throws Exception {
         log(105, LATENCIES);
         log(106, Arrays.copyOf(LATENCIES, BATCH_SIZE - 1)); // one lookup short of a batch event
 
-        verifyLoggedEvents(new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES));
+        verifyLoggedDnsEvents(new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES));
 
         log(106, Arrays.copyOfRange(LATENCIES, BATCH_SIZE - 1, BATCH_SIZE));
 
-        mEvCaptor = ArgumentCaptor.forClass(DnsEvent.class); // reset argument captor
-        verifyLoggedEvents(
+        mDnsEvCaptor = ArgumentCaptor.forClass(DnsEvent.class); // reset argument captor
+        verifyLoggedDnsEvents(
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(106, EVENT_TYPES, RETURN_CODES, LATENCIES));
     }
 
-    public void testSeveralBatches() throws Exception {
+    @SmallTest
+    public void testSeveralDmsBatches() throws Exception {
         log(105, LATENCIES);
         log(106, LATENCIES);
         log(105, LATENCIES);
         log(107, LATENCIES);
 
-        verifyLoggedEvents(
+        verifyLoggedDnsEvents(
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(106, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(107, EVENT_TYPES, RETURN_CODES, LATENCIES));
     }
 
-    public void testBatchAndNetworkLost() throws Exception {
+    @SmallTest
+    public void testDnsBatchAndNetworkLost() throws Exception {
         byte[] eventTypes = Arrays.copyOf(EVENT_TYPES, 20);
         byte[] returnCodes = Arrays.copyOf(RETURN_CODES, 20);
         int[] latencies = Arrays.copyOf(LATENCIES, 20);
@@ -120,13 +129,14 @@ public class NetdEventListenerServiceTest extends TestCase {
         mCallbackCaptor.getValue().onLost(new Network(105));
         log(105, LATENCIES);
 
-        verifyLoggedEvents(
+        verifyLoggedDnsEvents(
             new DnsEvent(105, eventTypes, returnCodes, latencies),
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES));
     }
 
-    public void testConcurrentBatchesAndDumps() throws Exception {
+    @SmallTest
+    public void testConcurrentDnsBatchesAndDumps() throws Exception {
         final long stop = System.currentTimeMillis() + 100;
         final PrintWriter pw = new PrintWriter(new FileOutputStream("/dev/null"));
         new Thread() {
@@ -137,26 +147,120 @@ public class NetdEventListenerServiceTest extends TestCase {
             }
         }.start();
 
-        logAsync(105, LATENCIES);
-        logAsync(106, LATENCIES);
-        logAsync(107, LATENCIES);
+        logDnsAsync(105, LATENCIES);
+        logDnsAsync(106, LATENCIES);
+        logDnsAsync(107, LATENCIES);
 
-        verifyLoggedEvents(500,
+        verifyLoggedDnsEvents(500,
             new DnsEvent(105, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(106, EVENT_TYPES, RETURN_CODES, LATENCIES),
             new DnsEvent(107, EVENT_TYPES, RETURN_CODES, LATENCIES));
     }
 
-    public void testConcurrentBatchesAndNetworkLoss() throws Exception {
-        logAsync(105, LATENCIES);
+    @SmallTest
+    public void testConcurrentDnsBatchesAndNetworkLoss() throws Exception {
+        logDnsAsync(105, LATENCIES);
         Thread.sleep(10L);
-        // call onLost() asynchronously to logAsync's onDnsEvent() calls.
+        // call onLost() asynchronously to logDnsAsync's onDnsEvent() calls.
         mCallbackCaptor.getValue().onLost(new Network(105));
 
         // do not verify unpredictable batch
         verify(mLog, timeout(500).times(1)).log(any());
     }
 
+    @SmallTest
+    public void testConnectLogging() throws Exception {
+        final int OK = 0;
+        Thread[] logActions = {
+            // ignored
+            connectEventAction(OsConstants.EALREADY, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EALREADY, 0, EXAMPLE_IPV6),
+            connectEventAction(OsConstants.EINPROGRESS, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EINPROGRESS, 0, EXAMPLE_IPV6),
+            connectEventAction(OsConstants.EINPROGRESS, 0, EXAMPLE_IPV6),
+            // valid latencies
+            connectEventAction(OK, 110, EXAMPLE_IPV4),
+            connectEventAction(OK, 23, EXAMPLE_IPV4),
+            connectEventAction(OK, 45, EXAMPLE_IPV4),
+            connectEventAction(OK, 56, EXAMPLE_IPV4),
+            connectEventAction(OK, 523, EXAMPLE_IPV6),
+            connectEventAction(OK, 214, EXAMPLE_IPV6),
+            connectEventAction(OK, 67, EXAMPLE_IPV6),
+            // errors
+            connectEventAction(OsConstants.EPERM, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EPERM, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EAGAIN, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EACCES, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EACCES, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.EACCES, 0, EXAMPLE_IPV6),
+            connectEventAction(OsConstants.EADDRINUSE, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.ETIMEDOUT, 0, EXAMPLE_IPV4),
+            connectEventAction(OsConstants.ETIMEDOUT, 0, EXAMPLE_IPV6),
+            connectEventAction(OsConstants.ETIMEDOUT, 0, EXAMPLE_IPV6),
+            connectEventAction(OsConstants.ECONNREFUSED, 0, EXAMPLE_IPV4),
+        };
+
+        for (Thread t : logActions) {
+            t.start();
+        }
+        for (Thread t : logActions) {
+            t.join();
+        }
+
+        List<IpConnectivityEvent> events = new ArrayList<>();
+        mNetdEventListenerService.flushStatistics(events);
+
+        IpConnectivityEvent got = events.get(0);
+        String want = joinLines(
+                "time_ms: 0",
+                "transport: 0",
+                "connect_statistics <",
+                "  connect_count: 12",
+                "  errnos_counters <",
+                "    key: 1",
+                "    value: 2",
+                "  >",
+                "  errnos_counters <",
+                "    key: 11",
+                "    value: 1",
+                "  >",
+                "  errnos_counters <",
+                "    key: 13",
+                "    value: 3",
+                "  >",
+                "  errnos_counters <",
+                "    key: 98",
+                "    value: 1",
+                "  >",
+                "  errnos_counters <",
+                "    key: 110",
+                "    value: 3",
+                "  >",
+                "  errnos_counters <",
+                "    key: 111",
+                "    value: 1",
+                "  >",
+                "  ipv6_addr_count: 6",
+                "  latencies_ms: 23",
+                "  latencies_ms: 45",
+                "  latencies_ms: 56",
+                "  latencies_ms: 67",
+                "  latencies_ms: 110",
+                "  latencies_ms: 214",
+                "  latencies_ms: 523");
+        verifyConnectEvent(want, got);
+    }
+
+    Thread connectEventAction(int error, int latencyMs, String ipAddr) {
+        return new Thread(() -> {
+            try {
+                mNetdEventListenerService.onConnectEvent(100, error, latencyMs, ipAddr, 80, 1);
+            } catch (Exception e) {
+                fail(e.toString());
+            }
+        });
+    }
+
     void log(int netId, int[] latencies) {
         try {
             for (int l : latencies) {
@@ -168,7 +272,7 @@ public class NetdEventListenerServiceTest extends TestCase {
         }
     }
 
-    void logAsync(int netId, int[] latencies) {
+    void logDnsAsync(int netId, int[] latencies) {
         new Thread() {
             public void run() {
                 log(netId, latencies);
@@ -176,15 +280,15 @@ public class NetdEventListenerServiceTest extends TestCase {
         }.start();
     }
 
-    void verifyLoggedEvents(DnsEvent... expected) {
-        verifyLoggedEvents(0, expected);
+    void verifyLoggedDnsEvents(DnsEvent... expected) {
+        verifyLoggedDnsEvents(0, expected);
     }
 
-    void verifyLoggedEvents(int wait, DnsEvent... expectedEvents) {
-        verify(mLog, timeout(wait).times(expectedEvents.length)).log(mEvCaptor.capture());
-        for (DnsEvent got : mEvCaptor.getAllValues()) {
+    void verifyLoggedDnsEvents(int wait, DnsEvent... expectedEvents) {
+        verify(mLog, timeout(wait).times(expectedEvents.length)).log(mDnsEvCaptor.capture());
+        for (DnsEvent got : mDnsEvCaptor.getAllValues()) {
             OptionalInt index = IntStream.range(0, expectedEvents.length)
-                    .filter(i -> eventsEqual(expectedEvents[i], got))
+                    .filter(i -> dnsEventsEqual(expectedEvents[i], got))
                     .findFirst();
             // Don't match same expected event more than once.
             index.ifPresent(i -> expectedEvents[i] = null);
@@ -193,11 +297,30 @@ public class NetdEventListenerServiceTest extends TestCase {
     }
 
     /** equality function for DnsEvent to avoid overriding equals() and hashCode(). */
-    static boolean eventsEqual(DnsEvent expected, DnsEvent got) {
+    static boolean dnsEventsEqual(DnsEvent expected, DnsEvent got) {
         return (expected == got) || ((expected != null) && (got != null)
                 && (expected.netId == got.netId)
                 && Arrays.equals(expected.eventTypes, got.eventTypes)
                 && Arrays.equals(expected.returnCodes, got.returnCodes)
                 && Arrays.equals(expected.latenciesMs, got.latenciesMs));
     }
+
+    static String joinLines(String ... elems) {
+        StringBuilder b = new StringBuilder();
+        for (String s : elems) {
+            b.append(s).append("\n");
+        }
+        return b.toString();
+    }
+
+    static void verifyConnectEvent(String expected, IpConnectivityEvent got) {
+        try {
+            Arrays.sort(got.connectStatistics.latenciesMs);
+            Arrays.sort(got.connectStatistics.errnosCounters,
+                    Comparator.comparingInt((p) -> p.key));
+            assertEquals(expected, got.toString());
+        } catch (Exception e) {
+            fail(e.toString());
+        }
+    }
 }