From f840e07f821cb17c9ae1e6583a28eff548b90892 Mon Sep 17 00:00:00 2001 From: Erik Kline Date: Mon, 10 Jul 2017 20:17:30 +0900 Subject: [PATCH] Change BlockingSocketReader to use MessageQueue fd handling Convert the simplistic blocking read in a separate thread model to the MessageQueue OnFileDescriptorEventListener model, albeit still on a separate thread. Test: as follows - built - flashed - booted - "runtest frameworks-net" passes - basic IpManager functions still work, including ConnectivityPacketTracker Bug: 62476366 Bug: 67013397 Change-Id: I1f4a6707eba402338947fe3f5392a26660f05714 --- .../android/net/ip/ConnectivityPacketTracker.java | 38 ++-- services/net/java/android/net/ip/IpManager.java | 3 +- .../android/net/util/BlockingSocketReader.java | 216 +++++++++++++++------ .../android/net/util/BlockingSocketReaderTest.java | 133 +++++++++---- 4 files changed, 272 insertions(+), 118 deletions(-) diff --git a/services/net/java/android/net/ip/ConnectivityPacketTracker.java b/services/net/java/android/net/ip/ConnectivityPacketTracker.java index 884a8a754266..0230f36b6fa0 100644 --- a/services/net/java/android/net/ip/ConnectivityPacketTracker.java +++ b/services/net/java/android/net/ip/ConnectivityPacketTracker.java @@ -61,11 +61,11 @@ public class ConnectivityPacketTracker { private static final String MARK_STOP = "--- STOP ---"; private final String mTag; - private final Handler mHandler; private final LocalLog mLog; private final BlockingSocketReader mPacketListener; + private boolean mRunning; - public ConnectivityPacketTracker(NetworkInterface netif, LocalLog log) { + public ConnectivityPacketTracker(Handler h, NetworkInterface netif, LocalLog log) { final String ifname; final int ifindex; final byte[] hwaddr; @@ -81,44 +81,40 @@ public class ConnectivityPacketTracker { } mTag = TAG + "." + ifname; - mHandler = new Handler(); mLog = log; - mPacketListener = new PacketListener(ifindex, hwaddr, mtu); + mPacketListener = new PacketListener(h, ifindex, hwaddr, mtu); } public void start() { - mLog.log(MARK_START); + mRunning = true; mPacketListener.start(); } public void stop() { mPacketListener.stop(); - mLog.log(MARK_STOP); + mRunning = false; } private final class PacketListener extends BlockingSocketReader { private final int mIfIndex; private final byte mHwAddr[]; - PacketListener(int ifindex, byte[] hwaddr, int mtu) { - super(mtu); + PacketListener(Handler h, int ifindex, byte[] hwaddr, int mtu) { + super(h, mtu); mIfIndex = ifindex; mHwAddr = hwaddr; } @Override - protected FileDescriptor createSocket() { + protected FileDescriptor createFd() { FileDescriptor s = null; try { - // TODO: Evaluate switching to SOCK_DGRAM and changing the - // BlockingSocketReader's read() to recvfrom(), so that this - // might work on non-ethernet-like links (via SLL). s = Os.socket(AF_PACKET, SOCK_RAW, 0); NetworkUtils.attachControlPacketFilter(s, ARPHRD_ETHER); Os.bind(s, new PacketSocketAddress((short) ETH_P_ALL, mIfIndex)); } catch (ErrnoException | IOException e) { logError("Failed to create packet tracking socket: ", e); - closeSocket(s); + closeFd(s); return null; } return s; @@ -136,13 +132,27 @@ public class ConnectivityPacketTracker { } @Override + protected void onStart() { + mLog.log(MARK_START); + } + + @Override + protected void onStop() { + if (mRunning) { + mLog.log(MARK_STOP); + } else { + mLog.log(MARK_STOP + " (packet listener stopped unexpectedly)"); + } + } + + @Override protected void logError(String msg, Exception e) { Log.e(mTag, msg, e); addLogEntry(msg + e); } private void addLogEntry(String entry) { - mHandler.post(() -> mLog.log(entry)); + mLog.log(entry); } } } diff --git a/services/net/java/android/net/ip/IpManager.java b/services/net/java/android/net/ip/IpManager.java index b1eb0854e334..bc07b8108631 100644 --- a/services/net/java/android/net/ip/IpManager.java +++ b/services/net/java/android/net/ip/IpManager.java @@ -1515,7 +1515,8 @@ public class IpManager extends StateMachine { private ConnectivityPacketTracker createPacketTracker() { try { - return new ConnectivityPacketTracker(mNetworkInterface, mConnectivityPacketLog); + return new ConnectivityPacketTracker( + getHandler(), mNetworkInterface, mConnectivityPacketLog); } catch (IllegalArgumentException e) { return null; } diff --git a/services/net/java/android/net/util/BlockingSocketReader.java b/services/net/java/android/net/util/BlockingSocketReader.java index 12fa1e57653a..99bf46952b4e 100644 --- a/services/net/java/android/net/util/BlockingSocketReader.java +++ b/services/net/java/android/net/util/BlockingSocketReader.java @@ -16,81 +16,106 @@ package android.net.util; +import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_INPUT; +import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_ERROR; + import android.annotation.Nullable; +import android.os.Handler; +import android.os.Looper; +import android.os.MessageQueue; +import android.os.MessageQueue.OnFileDescriptorEventListener; import android.system.ErrnoException; import android.system.Os; import android.system.OsConstants; -import libcore.io.IoBridge; +import libcore.io.IoUtils; import java.io.FileDescriptor; -import java.io.InterruptedIOException; import java.io.IOException; /** - * A thread that reads from a socket and passes the received packets to a - * subclass's handlePacket() method. The packet receive buffer is recycled - * on every read call, so subclasses should make any copies they would like - * inside their handlePacket() implementation. + * This class encapsulates the mechanics of registering a file descriptor + * with a thread's Looper and handling read events (and errors). + * + * Subclasses MUST implement createFd() and SHOULD override handlePacket(). + + * Subclasses can expect a call life-cycle like the following: + * + * [1] start() calls createFd() and (if all goes well) onStart() + * + * [2] yield, waiting for read event or error notification: + * + * [a] readPacket() && handlePacket() * - * All public methods may be called from any thread. + * [b] if (no error): + * goto 2 + * else: + * goto 3 + * + * [3] stop() calls onStop() if not previously stopped + * + * The packet receive buffer is recycled on every read call, so subclasses + * should make any copies they would like inside their handlePacket() + * implementation. + * + * All public methods MUST only be called from the same thread with which + * the Handler constructor argument is associated. + * + * TODO: rename this class to something more correctly descriptive (something + * like [or less horrible than] FdReadEventsHandler?). * * @hide */ public abstract class BlockingSocketReader { + private static final int FD_EVENTS = EVENT_INPUT | EVENT_ERROR; + private static final int UNREGISTER_THIS_FD = 0; + public static final int DEFAULT_RECV_BUF_SIZE = 2 * 1024; + private final Handler mHandler; + private final MessageQueue mQueue; private final byte[] mPacket; - private final Thread mThread; - private volatile FileDescriptor mSocket; - private volatile boolean mRunning; - private volatile long mPacketsReceived; - - // Make it slightly easier for subclasses to properly close a socket - // without having to know this incantation. - public static final void closeSocket(@Nullable FileDescriptor fd) { - try { - IoBridge.closeAndSignalBlockedThreads(fd); - } catch (IOException ignored) {} - } + private FileDescriptor mFd; + private long mPacketsReceived; - protected BlockingSocketReader() { - this(DEFAULT_RECV_BUF_SIZE); + protected static void closeFd(FileDescriptor fd) { + IoUtils.closeQuietly(fd); } - protected BlockingSocketReader(int recvbufsize) { - if (recvbufsize < DEFAULT_RECV_BUF_SIZE) { - recvbufsize = DEFAULT_RECV_BUF_SIZE; - } - mPacket = new byte[recvbufsize]; - mThread = new Thread(() -> { mainLoop(); }); + protected BlockingSocketReader(Handler h) { + this(h, DEFAULT_RECV_BUF_SIZE); } - public final boolean start() { - if (mSocket != null) return false; + protected BlockingSocketReader(Handler h, int recvbufsize) { + mHandler = h; + mQueue = mHandler.getLooper().getQueue(); + mPacket = new byte[Math.max(recvbufsize, DEFAULT_RECV_BUF_SIZE)]; + } - try { - mSocket = createSocket(); - } catch (Exception e) { - logError("Failed to create socket: ", e); - return false; + public final void start() { + if (onCorrectThread()) { + createAndRegisterFd(); + } else { + mHandler.post(() -> { + logError("start() called from off-thread", null); + createAndRegisterFd(); + }); } - - if (mSocket == null) return false; - - mRunning = true; - mThread.start(); - return true; } public final void stop() { - mRunning = false; - closeSocket(mSocket); - mSocket = null; + if (onCorrectThread()) { + unregisterAndDestroyFd(); + } else { + mHandler.post(() -> { + logError("stop() called from off-thread", null); + unregisterAndDestroyFd(); + }); + } } - public final boolean isRunning() { return mRunning; } + public final int recvBufSize() { return mPacket.length; } public final long numPacketsReceived() { return mPacketsReceived; } @@ -98,11 +123,21 @@ public abstract class BlockingSocketReader { * Subclasses MUST create the listening socket here, including setting * all desired socket options, interface or address/port binding, etc. */ - protected abstract FileDescriptor createSocket(); + protected abstract FileDescriptor createFd(); + + /** + * Subclasses MAY override this to change the default read() implementation + * in favour of, say, recvfrom(). + * + * Implementations MUST return the bytes read or throw an Exception. + */ + protected int readPacket(FileDescriptor fd, byte[] packetBuffer) throws Exception { + return Os.read(fd, packetBuffer, 0, packetBuffer.length); + } /** * Called by the main loop for every packet. Any desired copies of - * |recvbuf| should be made in here, and the underlying byte array is + * |recvbuf| should be made in here, as the underlying byte array is * reused across all reads. */ protected void handlePacket(byte[] recvbuf, int length) {} @@ -113,43 +148,102 @@ public abstract class BlockingSocketReader { protected void logError(String msg, Exception e) {} /** - * Called by the main loop just prior to exiting. + * Called by start(), if successful, just prior to returning. + */ + protected void onStart() {} + + /** + * Called by stop() just prior to returning. */ - protected void onExit() {} + protected void onStop() {} + + private void createAndRegisterFd() { + if (mFd != null) return; + + try { + mFd = createFd(); + if (mFd != null) { + // Force the socket to be non-blocking. + IoUtils.setBlocking(mFd, false); + } + } catch (Exception e) { + logError("Failed to create socket: ", e); + closeFd(mFd); + mFd = null; + return; + } + + if (mFd == null) return; + + mQueue.addOnFileDescriptorEventListener( + mFd, + FD_EVENTS, + new OnFileDescriptorEventListener() { + @Override + public int onFileDescriptorEvents(FileDescriptor fd, int events) { + // Always call handleInput() so read/recvfrom are given + // a proper chance to encounter a meaningful errno and + // perhaps log a useful error message. + if (!isRunning() || !handleInput()) { + unregisterAndDestroyFd(); + return UNREGISTER_THIS_FD; + } + return FD_EVENTS; + } + }); + onStart(); + } - private final void mainLoop() { + private boolean isRunning() { return (mFd != null) && mFd.valid(); } + + // Keep trying to read until we get EAGAIN/EWOULDBLOCK or some fatal error. + private boolean handleInput() { while (isRunning()) { final int bytesRead; try { - // Blocking read. - // TODO: See if this can be converted to recvfrom. - bytesRead = Os.read(mSocket, mPacket, 0, mPacket.length); + bytesRead = readPacket(mFd, mPacket); if (bytesRead < 1) { if (isRunning()) logError("Socket closed, exiting", null); break; } mPacketsReceived++; } catch (ErrnoException e) { - if (e.errno != OsConstants.EINTR) { - if (isRunning()) logError("read error: ", e); + if (e.errno == OsConstants.EAGAIN) { + // We've read everything there is to read this time around. + return true; + } else if (e.errno == OsConstants.EINTR) { + continue; + } else { + if (isRunning()) logError("readPacket error: ", e); break; } - continue; - } catch (IOException ioe) { - if (isRunning()) logError("read error: ", ioe); - continue; + } catch (Exception e) { + if (isRunning()) logError("readPacket error: ", e); + break; } try { handlePacket(mPacket, bytesRead); } catch (Exception e) { - logError("Unexpected exception: ", e); + logError("handlePacket error: ", e); break; } } - stop(); - onExit(); + return false; + } + + private void unregisterAndDestroyFd() { + if (mFd == null) return; + + mQueue.removeOnFileDescriptorEventListener(mFd); + closeFd(mFd); + mFd = null; + onStop(); + } + + private boolean onCorrectThread() { + return (mHandler.getLooper() == Looper.myLooper()); } } diff --git a/tests/net/java/android/net/util/BlockingSocketReaderTest.java b/tests/net/java/android/net/util/BlockingSocketReaderTest.java index e03350f29f95..1aad4533d3d1 100644 --- a/tests/net/java/android/net/util/BlockingSocketReaderTest.java +++ b/tests/net/java/android/net/util/BlockingSocketReaderTest.java @@ -16,8 +16,11 @@ package android.net.util; +import static android.net.util.BlockingSocketReader.DEFAULT_RECV_BUF_SIZE; import static android.system.OsConstants.*; +import android.os.Handler; +import android.os.HandlerThread; import android.system.ErrnoException; import android.system.Os; import android.system.StructTimeval; @@ -27,6 +30,7 @@ import libcore.io.IoBridge; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.Inet6Address; @@ -53,61 +57,83 @@ public class BlockingSocketReaderTest extends TestCase { protected FileDescriptor mLocalSocket; protected InetSocketAddress mLocalSockName; protected byte[] mLastRecvBuf; - protected boolean mExited; + protected boolean mStopped; + protected HandlerThread mHandlerThread; protected BlockingSocketReader mReceiver; + class UdpLoopbackReader extends BlockingSocketReader { + public UdpLoopbackReader(Handler h) { + super(h); + } + + @Override + protected FileDescriptor createFd() { + FileDescriptor s = null; + try { + s = Os.socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + Os.bind(s, LOOPBACK6, 0); + mLocalSockName = (InetSocketAddress) Os.getsockname(s); + Os.setsockoptTimeval(s, SOL_SOCKET, SO_SNDTIMEO, TIMEO); + } catch (ErrnoException|SocketException e) { + closeFd(s); + fail(); + return null; + } + + mLocalSocket = s; + return s; + } + + @Override + protected void handlePacket(byte[] recvbuf, int length) { + mLastRecvBuf = Arrays.copyOf(recvbuf, length); + mLatch.countDown(); + } + + @Override + protected void onStart() { + mStopped = false; + mLatch.countDown(); + } + + @Override + protected void onStop() { + mStopped = true; + mLatch.countDown(); + } + }; + @Override public void setUp() { resetLatch(); mLocalSocket = null; mLocalSockName = null; mLastRecvBuf = null; - mExited = false; - - mReceiver = new BlockingSocketReader() { - @Override - protected FileDescriptor createSocket() { - FileDescriptor s = null; - try { - s = Os.socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); - Os.bind(s, LOOPBACK6, 0); - mLocalSockName = (InetSocketAddress) Os.getsockname(s); - Os.setsockoptTimeval(s, SOL_SOCKET, SO_SNDTIMEO, TIMEO); - } catch (ErrnoException|SocketException e) { - closeSocket(s); - fail(); - return null; - } - - mLocalSocket = s; - return s; - } + mStopped = false; - @Override - protected void handlePacket(byte[] recvbuf, int length) { - mLastRecvBuf = Arrays.copyOf(recvbuf, length); - mLatch.countDown(); - } - - @Override - protected void onExit() { - mExited = true; - mLatch.countDown(); - } - }; + mHandlerThread = new HandlerThread(BlockingSocketReaderTest.class.getSimpleName()); + mHandlerThread.start(); } @Override - public void tearDown() { - if (mReceiver != null) mReceiver.stop(); + public void tearDown() throws Exception { + if (mReceiver != null) { + mHandlerThread.getThreadHandler().post(() -> { mReceiver.stop(); }); + waitForActivity(); + } mReceiver = null; + mHandlerThread.quit(); + mHandlerThread = null; } void resetLatch() { mLatch = new CountDownLatch(1); } void waitForActivity() throws Exception { - assertTrue(mLatch.await(500, TimeUnit.MILLISECONDS)); - resetLatch(); + try { + mLatch.await(1000, TimeUnit.MILLISECONDS); + } finally { + resetLatch(); + } } void sendPacket(byte[] contents) throws Exception { @@ -118,31 +144,54 @@ public class BlockingSocketReaderTest extends TestCase { } public void testBasicWorking() throws Exception { - assertTrue(mReceiver.start()); + final Handler h = mHandlerThread.getThreadHandler(); + mReceiver = new UdpLoopbackReader(h); + + h.post(() -> { mReceiver.start(); }); + waitForActivity(); assertTrue(mLocalSockName != null); assertEquals(LOOPBACK6, mLocalSockName.getAddress()); assertTrue(0 < mLocalSockName.getPort()); assertTrue(mLocalSocket != null); - assertFalse(mExited); + assertFalse(mStopped); final byte[] one = "one 1".getBytes("UTF-8"); sendPacket(one); waitForActivity(); assertEquals(1, mReceiver.numPacketsReceived()); assertTrue(Arrays.equals(one, mLastRecvBuf)); - assertFalse(mExited); + assertFalse(mStopped); final byte[] two = "two 2".getBytes("UTF-8"); sendPacket(two); waitForActivity(); assertEquals(2, mReceiver.numPacketsReceived()); assertTrue(Arrays.equals(two, mLastRecvBuf)); - assertFalse(mExited); + assertFalse(mStopped); mReceiver.stop(); waitForActivity(); assertEquals(2, mReceiver.numPacketsReceived()); assertTrue(Arrays.equals(two, mLastRecvBuf)); - assertTrue(mExited); + assertTrue(mStopped); + mReceiver = null; + } + + class NullBlockingSocketReader extends BlockingSocketReader { + public NullBlockingSocketReader(Handler h, int recvbufsize) { + super(h, recvbufsize); + } + + @Override + public FileDescriptor createFd() { return null; } + } + + public void testMinimalRecvBufSize() throws Exception { + final Handler h = mHandlerThread.getThreadHandler(); + + for (int i : new int[]{-1, 0, 1, DEFAULT_RECV_BUF_SIZE-1}) { + final BlockingSocketReader b = new NullBlockingSocketReader(h, i); + assertEquals(DEFAULT_RECV_BUF_SIZE, b.recvBufSize()); + } } } -- 2.11.0