OSDN Git Service

libpdx_uds: Fix RPC channel ID allocation to not recycle values as often
authorAlex Vakulenko <avakulenko@google.com>
Thu, 6 Apr 2017 21:51:54 +0000 (14:51 -0700)
committerAlex Vakulenko <avakulenko@google.com>
Thu, 6 Apr 2017 23:17:52 +0000 (16:17 -0700)
The value of channel ID for PDX service on UDS transport was actual file
descriptor value for the data socket. Since channels are open and closed
constantly, it is quite often that channel ID is being reused immediately,
so it is almost impossible to use `cid` as unique identifier for objects
being passed around.

Instead, we now use monotonically growing channel ID value.

To prevent the possibility of using channel ID as the socket FD and vice
versa, changed all helper functions that used to take socket_fd as an int
to explicitly use BorrowedHandle which disables implicit conversion from
int (and hence makes it impossible to mistakenly pass in the channel ID).

Bug: 37082296
Test: `m -j32` succeeds for sailfish-eng
      Ran libpdx_uds_tests on the device -> all pass
      Device boots and CubeSea and VrHome render correctly with vr_flinger

Change-Id: Ibb8dfee4d6c3f4b6120c0b6e20a253f1b9307c19

libs/vr/libpdx_uds/client_channel.cpp
libs/vr/libpdx_uds/client_channel_factory.cpp
libs/vr/libpdx_uds/ipc_helper.cpp
libs/vr/libpdx_uds/private/uds/ipc_helper.h
libs/vr/libpdx_uds/private/uds/service_endpoint.h
libs/vr/libpdx_uds/service_endpoint.cpp

index 4cbdb94..924335f 100644 (file)
@@ -67,7 +67,7 @@ struct TransactionState {
   ResponseHeader<LocalHandle> response;
 };
 
-Status<void> ReadAndDiscardData(int socket_fd, size_t size) {
+Status<void> ReadAndDiscardData(const BorrowedHandle& socket_fd, size_t size) {
   while (size > 0) {
     // If there is more data to read in the message than the buffers provided
     // by the caller, read and discard the extra data from the socket.
@@ -83,9 +83,10 @@ Status<void> ReadAndDiscardData(int socket_fd, size_t size) {
   return ErrorStatus(EIO);
 }
 
-Status<void> SendRequest(int socket_fd, TransactionState* transaction_state,
-                         int opcode, const iovec* send_vector,
-                         size_t send_count, size_t max_recv_len) {
+Status<void> SendRequest(const BorrowedHandle& socket_fd,
+                         TransactionState* transaction_state, int opcode,
+                         const iovec* send_vector, size_t send_count,
+                         size_t max_recv_len) {
   size_t send_len = CountVectorSize(send_vector, send_count);
   InitRequest(&transaction_state->request, opcode, send_len, max_recv_len,
               false);
@@ -95,7 +96,8 @@ Status<void> SendRequest(int socket_fd, TransactionState* transaction_state,
   return status;
 }
 
-Status<void> ReceiveResponse(int socket_fd, TransactionState* transaction_state,
+Status<void> ReceiveResponse(const BorrowedHandle& socket_fd,
+                             TransactionState* transaction_state,
                              const iovec* receive_vector, size_t receive_count,
                              size_t max_recv_len) {
   auto status = ReceiveData(socket_fd, &transaction_state->response);
@@ -164,7 +166,7 @@ Status<void> ClientChannel::SendImpulse(int opcode, const void* buffer,
 
   InitRequest(&request, opcode, length, 0, true);
   memcpy(request.impulse_payload.data(), buffer, length);
-  return SendData(channel_handle_.value(), request);
+  return SendData(BorrowedHandle{channel_handle_.value()}, request);
 }
 
 Status<int> ClientChannel::SendAndReceive(void* transaction_state, int opcode,
@@ -182,11 +184,11 @@ Status<int> ClientChannel::SendAndReceive(void* transaction_state, int opcode,
   auto* state = static_cast<TransactionState*>(transaction_state);
   size_t max_recv_len = CountVectorSize(receive_vector, receive_count);
 
-  auto status = SendRequest(channel_handle_.value(), state, opcode, send_vector,
-                            send_count, max_recv_len);
+  auto status = SendRequest(BorrowedHandle{channel_handle_.value()}, state,
+                            opcode, send_vector, send_count, max_recv_len);
   if (status) {
-    status = ReceiveResponse(channel_handle_.value(), state, receive_vector,
-                             receive_count, max_recv_len);
+    status = ReceiveResponse(BorrowedHandle{channel_handle_.value()}, state,
+                             receive_vector, receive_count, max_recv_len);
   }
   if (!result.PropagateError(status)) {
     const int return_code = state->response.ret_code;
index f059453..323236d 100644 (file)
@@ -111,11 +111,11 @@ Status<std::unique_ptr<pdx::ClientChannel>> ClientChannelFactory::Connect(
         remote.sun_path);
   RequestHeader<BorrowedHandle> request;
   InitRequest(&request, opcodes::CHANNEL_OPEN, 0, 0, false);
-  status = SendData(socket_fd.Get(), request);
+  status = SendData(socket_fd.Borrow(), request);
   if (!status)
     return ErrorStatus(status.error());
   ResponseHeader<LocalHandle> response;
-  status = ReceiveData(socket_fd.Get(), &response);
+  status = ReceiveData(socket_fd.Borrow(), &response);
   if (!status)
     return ErrorStatus(status.error());
   int ref = response.ret_code;
index fe5c986..d604f62 100644 (file)
@@ -26,18 +26,19 @@ struct MessagePreamble {
   uint32_t fd_count{0};
 };
 
-Status<void> SendPayload::Send(int socket_fd) {
+Status<void> SendPayload::Send(const BorrowedHandle& socket_fd) {
   return Send(socket_fd, nullptr);
 }
 
-Status<void> SendPayload::Send(int socket_fd, const ucred* cred) {
+Status<void> SendPayload::Send(const BorrowedHandle& socket_fd,
+                               const ucred* cred) {
   MessagePreamble preamble;
   preamble.magic = kMagicPreamble;
   preamble.data_size = buffer_.size();
   preamble.fd_count = file_handles_.size();
 
-  ssize_t ret =
-      RETRY_EINTR(send(socket_fd, &preamble, sizeof(preamble), MSG_NOSIGNAL));
+  ssize_t ret = RETRY_EINTR(
+      send(socket_fd.Get(), &preamble, sizeof(preamble), MSG_NOSIGNAL));
   if (ret < 0)
     return ErrorStatus(errno);
   if (ret != sizeof(preamble))
@@ -71,7 +72,7 @@ Status<void> SendPayload::Send(int socket_fd, const ucred* cred) {
     }
   }
 
-  ret = RETRY_EINTR(sendmsg(socket_fd, &msg, MSG_NOSIGNAL));
+  ret = RETRY_EINTR(sendmsg(socket_fd.Get(), &msg, MSG_NOSIGNAL));
   if (ret < 0)
     return ErrorStatus(errno);
   if (static_cast<size_t>(ret) != buffer_.size())
@@ -125,14 +126,15 @@ Status<ChannelReference> SendPayload::PushChannelHandle(
   return ErrorStatus{EOPNOTSUPP};
 }
 
-Status<void> ReceivePayload::Receive(int socket_fd) {
+Status<void> ReceivePayload::Receive(const BorrowedHandle& socket_fd) {
   return Receive(socket_fd, nullptr);
 }
 
-Status<void> ReceivePayload::Receive(int socket_fd, ucred* cred) {
+Status<void> ReceivePayload::Receive(const BorrowedHandle& socket_fd,
+                                     ucred* cred) {
   MessagePreamble preamble;
-  ssize_t ret =
-      RETRY_EINTR(recv(socket_fd, &preamble, sizeof(preamble), MSG_WAITALL));
+  ssize_t ret = RETRY_EINTR(
+      recv(socket_fd.Get(), &preamble, sizeof(preamble), MSG_WAITALL));
   if (ret < 0)
     return ErrorStatus(errno);
   else if (ret == 0)
@@ -157,7 +159,7 @@ Status<void> ReceivePayload::Receive(int socket_fd, ucred* cred) {
     msg.msg_control = alloca(msg.msg_controllen);
   }
 
-  ret = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
+  ret = RETRY_EINTR(recvmsg(socket_fd.Get(), &msg, MSG_WAITALL));
   if (ret < 0)
     return ErrorStatus(errno);
   else if (ret == 0)
@@ -219,8 +221,10 @@ bool ReceivePayload::GetChannelHandle(ChannelReference /*ref*/,
   return false;
 }
 
-Status<void> SendData(int socket_fd, const void* data, size_t size) {
-  ssize_t size_written = RETRY_EINTR(send(socket_fd, data, size, MSG_NOSIGNAL));
+Status<void> SendData(const BorrowedHandle& socket_fd, const void* data,
+                      size_t size) {
+  ssize_t size_written =
+      RETRY_EINTR(send(socket_fd.Get(), data, size, MSG_NOSIGNAL));
   if (size_written < 0)
     return ErrorStatus(errno);
   if (static_cast<size_t>(size_written) != size)
@@ -228,11 +232,13 @@ Status<void> SendData(int socket_fd, const void* data, size_t size) {
   return {};
 }
 
-Status<void> SendDataVector(int socket_fd, const iovec* data, size_t count) {
+Status<void> SendDataVector(const BorrowedHandle& socket_fd, const iovec* data,
+                            size_t count) {
   msghdr msg = {};
   msg.msg_iov = const_cast<iovec*>(data);
   msg.msg_iovlen = count;
-  ssize_t size_written = RETRY_EINTR(sendmsg(socket_fd, &msg, MSG_NOSIGNAL));
+  ssize_t size_written =
+      RETRY_EINTR(sendmsg(socket_fd.Get(), &msg, MSG_NOSIGNAL));
   if (size_written < 0)
     return ErrorStatus(errno);
   if (static_cast<size_t>(size_written) != CountVectorSize(data, count))
@@ -240,8 +246,10 @@ Status<void> SendDataVector(int socket_fd, const iovec* data, size_t count) {
   return {};
 }
 
-Status<void> ReceiveData(int socket_fd, void* data, size_t size) {
-  ssize_t size_read = RETRY_EINTR(recv(socket_fd, data, size, MSG_WAITALL));
+Status<void> ReceiveData(const BorrowedHandle& socket_fd, void* data,
+                         size_t size) {
+  ssize_t size_read =
+      RETRY_EINTR(recv(socket_fd.Get(), data, size, MSG_WAITALL));
   if (size_read < 0)
     return ErrorStatus(errno);
   else if (size_read == 0)
@@ -251,11 +259,12 @@ Status<void> ReceiveData(int socket_fd, void* data, size_t size) {
   return {};
 }
 
-Status<void> ReceiveDataVector(int socket_fd, const iovec* data, size_t count) {
+Status<void> ReceiveDataVector(const BorrowedHandle& socket_fd,
+                               const iovec* data, size_t count) {
   msghdr msg = {};
   msg.msg_iov = const_cast<iovec*>(data);
   msg.msg_iovlen = count;
-  ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
+  ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd.Get(), &msg, MSG_WAITALL));
   if (size_read < 0)
     return ErrorStatus(errno);
   else if (size_read == 0)
index 80530bf..82950a2 100644 (file)
@@ -25,8 +25,8 @@ namespace uds {
 
 class SendPayload : public MessageWriter, public OutputResourceMapper {
  public:
-  Status<void> Send(int socket_fd);
-  Status<void> Send(int socket_fd, const ucred* cred);
+  Status<void> Send(const BorrowedHandle& socket_fd);
+  Status<void> Send(const BorrowedHandle& socket_fd, const ucred* cred);
 
   // MessageWriter
   void* GetNextWriteBufferSection(size_t size) override;
@@ -50,8 +50,8 @@ class SendPayload : public MessageWriter, public OutputResourceMapper {
 
 class ReceivePayload : public MessageReader, public InputResourceMapper {
  public:
-  Status<void> Receive(int socket_fd);
-  Status<void> Receive(int socket_fd, ucred* cred);
+  Status<void> Receive(const BorrowedHandle& socket_fd);
+  Status<void> Receive(const BorrowedHandle& socket_fd, ucred* cred);
 
   // MessageReader
   BufferSection GetNextReadBufferSection() override;
@@ -111,25 +111,27 @@ class ResponseHeader {
 };
 
 template <typename T>
-inline Status<void> SendData(int socket_fd, const T& data) {
+inline Status<void> SendData(const BorrowedHandle& socket_fd, const T& data) {
   SendPayload payload;
   rpc::Serialize(data, &payload);
   return payload.Send(socket_fd);
 }
 
 template <typename FileHandleType>
-inline Status<void> SendData(int socket_fd,
+inline Status<void> SendData(const BorrowedHandle& socket_fd,
                              const RequestHeader<FileHandleType>& request) {
   SendPayload payload;
   rpc::Serialize(request, &payload);
   return payload.Send(socket_fd, &request.cred);
 }
 
-Status<void> SendData(int socket_fd, const void* data, size_t size);
-Status<void> SendDataVector(int socket_fd, const iovec* data, size_t count);
+Status<void> SendData(const BorrowedHandle& socket_fd, const void* data,
+                      size_t size);
+Status<void> SendDataVector(const BorrowedHandle& socket_fd, const iovec* data,
+                            size_t count);
 
 template <typename T>
-inline Status<void> ReceiveData(int socket_fd, T* data) {
+inline Status<void> ReceiveData(const BorrowedHandle& socket_fd, T* data) {
   ReceivePayload payload;
   Status<void> status = payload.Receive(socket_fd);
   if (status && rpc::Deserialize(data, &payload) != rpc::ErrorCode::NO_ERROR)
@@ -138,7 +140,7 @@ inline Status<void> ReceiveData(int socket_fd, T* data) {
 }
 
 template <typename FileHandleType>
-inline Status<void> ReceiveData(int socket_fd,
+inline Status<void> ReceiveData(const BorrowedHandle& socket_fd,
                                 RequestHeader<FileHandleType>* request) {
   ReceivePayload payload;
   Status<void> status = payload.Receive(socket_fd, &request->cred);
@@ -147,8 +149,10 @@ inline Status<void> ReceiveData(int socket_fd,
   return status;
 }
 
-Status<void> ReceiveData(int socket_fd, void* data, size_t size);
-Status<void> ReceiveDataVector(int socket_fd, const iovec* data, size_t count);
+Status<void> ReceiveData(const BorrowedHandle& socket_fd, void* data,
+                         size_t size);
+Status<void> ReceiveDataVector(const BorrowedHandle& socket_fd,
+                               const iovec* data, size_t count);
 
 size_t CountVectorSize(const iovec* data, size_t count);
 void InitRequest(android::pdx::uds::RequestHeader<BorrowedHandle>* request,
index 2b24f62..f747abc 100644 (file)
@@ -117,18 +117,20 @@ class Endpoint : public pdx::Endpoint {
     return next_message_id_.fetch_add(1, std::memory_order_relaxed);
   }
 
-  void BuildCloseMessage(int channel_id, Message* message);
+  void BuildCloseMessage(int32_t channel_id, Message* message);
 
   Status<void> AcceptConnection(Message* message);
-  Status<void> ReceiveMessageForChannel(int channel_id, Message* message);
+  Status<void> ReceiveMessageForChannel(const BorrowedHandle& channel_fd,
+                                        Message* message);
   Status<void> OnNewChannel(LocalHandle channel_fd);
-  Status<ChannelData*> OnNewChannelLocked(LocalHandle channel_fd,
-                                          Channel* channel_state);
-  Status<void> CloseChannelLocked(int channel_id);
-  Status<void> ReenableEpollEvent(int fd);
-  Channel* GetChannelState(int channel_id);
-  int GetChannelSocketFd(int channel_id);
-  int GetChannelEventFd(int channel_id);
+  Status<std::pair<int32_t, ChannelData*>> OnNewChannelLocked(
+      LocalHandle channel_fd, Channel* channel_state);
+  Status<void> CloseChannelLocked(int32_t channel_id);
+  Status<void> ReenableEpollEvent(const BorrowedHandle& channel_fd);
+  Channel* GetChannelState(int32_t channel_id);
+  BorrowedHandle GetChannelSocketFd(int32_t channel_id);
+  BorrowedHandle GetChannelEventFd(int32_t channel_id);
+  int32_t GetChannelId(const BorrowedHandle& channel_fd);
 
   std::string endpoint_path_;
   bool is_blocking_;
@@ -137,7 +139,9 @@ class Endpoint : public pdx::Endpoint {
   LocalHandle epoll_fd_;
 
   mutable std::mutex channel_mutex_;
-  std::map<int, ChannelData> channels_;
+  std::map<int32_t, ChannelData> channels_;
+  std::map<int, int32_t> channel_fd_to_id_;
+  int32_t last_channel_id_{0};
 
   Service* service_{nullptr};
   std::atomic<uint32_t> next_message_id_;
index 6f32867..65fd59f 100644 (file)
@@ -220,9 +220,11 @@ Status<void> Endpoint::AcceptConnection(Message* message) {
     return ErrorStatus(errno);
   }
 
-  auto status = ReceiveMessageForChannel(channel_fd.Get(), message);
+  // Borrow the channel handle before we pass (move) it into OnNewChannel().
+  BorrowedHandle borrowed_channel_handle = channel_fd.Borrow();
+  auto status = OnNewChannel(std::move(channel_fd));
   if (status)
-    status = OnNewChannel(std::move(channel_fd));
+    status = ReceiveMessageForChannel(borrowed_channel_handle, message);
   return status;
 }
 
@@ -247,7 +249,7 @@ Status<void> Endpoint::OnNewChannel(LocalHandle channel_fd) {
   return status;
 }
 
-Status<Endpoint::ChannelData*> Endpoint::OnNewChannelLocked(
+Status<std::pair<int32_t, Endpoint::ChannelData*>> Endpoint::OnNewChannelLocked(
     LocalHandle channel_fd, Channel* channel_state) {
   epoll_event event;
   event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
@@ -259,19 +261,28 @@ Status<Endpoint::ChannelData*> Endpoint::OnNewChannelLocked(
     return ErrorStatus(errno);
   }
   ChannelData channel_data;
-  const int channel_id = channel_fd.Get();
   channel_data.event_set.AddDataFd(channel_fd);
   channel_data.data_fd = std::move(channel_fd);
   channel_data.channel_state = channel_state;
-  auto pair = channels_.emplace(channel_id, std::move(channel_data));
-  return &pair.first->second;
+  for (;;) {
+    // Try new channel IDs until we find one which is not already in the map.
+    if (last_channel_id_++ == std::numeric_limits<int32_t>::max())
+      last_channel_id_ = 1;
+    auto iter = channels_.lower_bound(last_channel_id_);
+    if (iter == channels_.end() || iter->first != last_channel_id_) {
+      channel_fd_to_id_.emplace(channel_data.data_fd.Get(), last_channel_id_);
+      iter = channels_.emplace_hint(iter, last_channel_id_,
+                                    std::move(channel_data));
+      return std::make_pair(last_channel_id_, &iter->second);
+    }
+  }
 }
 
-Status<void> Endpoint::ReenableEpollEvent(int fd) {
+Status<void> Endpoint::ReenableEpollEvent(const BorrowedHandle& fd) {
   epoll_event event;
   event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
-  event.data.fd = fd;
-  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, fd, &event) < 0) {
+  event.data.fd = fd.Get();
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, fd.Get(), &event) < 0) {
     ALOGE(
         "Endpoint::ReenableEpollEvent: Failed to re-enable channel to "
         "endpoint: %s\n",
@@ -286,16 +297,17 @@ Status<void> Endpoint::CloseChannel(int channel_id) {
   return CloseChannelLocked(channel_id);
 }
 
-Status<void> Endpoint::CloseChannelLocked(int channel_id) {
+Status<void> Endpoint::CloseChannelLocked(int32_t channel_id) {
   ALOGD_IF(TRACE, "Endpoint::CloseChannelLocked: channel_id=%d", channel_id);
 
-  auto channel_data = channels_.find(channel_id);
-  if (channel_data == channels_.end())
+  auto iter = channels_.find(channel_id);
+  if (iter == channels_.end())
     return ErrorStatus{EINVAL};
 
+  int channel_fd = iter->second.data_fd.Get();
   Status<void> status;
   epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
-  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, channel_id, &dummy) < 0) {
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, channel_fd, &dummy) < 0) {
     status.SetError(errno);
     ALOGE(
         "Endpoint::CloseChannelLocked: Failed to remove channel from endpoint: "
@@ -305,7 +317,8 @@ Status<void> Endpoint::CloseChannelLocked(int channel_id) {
     status.SetValue();
   }
 
-  channels_.erase(channel_data);
+  channel_fd_to_id_.erase(channel_fd);
+  channels_.erase(iter);
   return status;
 }
 
@@ -348,10 +361,10 @@ Status<RemoteChannelHandle> Endpoint::PushChannel(Message* message,
   }
 
   std::lock_guard<std::mutex> autolock(channel_mutex_);
-  *channel_id = local_socket.Get();
   auto channel_data = OnNewChannelLocked(std::move(local_socket), channel);
   if (!channel_data)
     return channel_data.error_status();
+  *channel_id = channel_data.get().first;
 
   // Flags are ignored for now.
   // TODO(xiaohuit): Implement those.
@@ -359,7 +372,7 @@ Status<RemoteChannelHandle> Endpoint::PushChannel(Message* message,
   auto* state = static_cast<MessageState*>(message->GetState());
   Status<ChannelReference> ref = state->PushChannelHandle(
       remote_socket.Borrow(),
-      channel_data.get()->event_set.event_fd().Borrow());
+      channel_data.get().second->event_set.event_fd().Borrow());
   if (!ref)
     return ref.error_status();
   state->sockets_to_close.push_back(std::move(remote_socket));
@@ -373,32 +386,42 @@ Status<int> Endpoint::CheckChannel(const Message* /*message*/,
   return ErrorStatus(EFAULT);
 }
 
-Channel* Endpoint::GetChannelState(int channel_id) {
+Channel* Endpoint::GetChannelState(int32_t channel_id) {
   std::lock_guard<std::mutex> autolock(channel_mutex_);
   auto channel_data = channels_.find(channel_id);
   return (channel_data != channels_.end()) ? channel_data->second.channel_state
                                            : nullptr;
 }
 
-int Endpoint::GetChannelSocketFd(int channel_id) {
+BorrowedHandle Endpoint::GetChannelSocketFd(int32_t channel_id) {
   std::lock_guard<std::mutex> autolock(channel_mutex_);
+  BorrowedHandle handle;
   auto channel_data = channels_.find(channel_id);
-  return (channel_data != channels_.end()) ? channel_data->second.data_fd.Get()
-                                           : -1;
+  if (channel_data != channels_.end())
+    handle = channel_data->second.data_fd.Borrow();
+  return handle;
 }
 
-int Endpoint::GetChannelEventFd(int channel_id) {
+BorrowedHandle Endpoint::GetChannelEventFd(int32_t channel_id) {
   std::lock_guard<std::mutex> autolock(channel_mutex_);
+  BorrowedHandle handle;
   auto channel_data = channels_.find(channel_id);
-  return (channel_data != channels_.end())
-             ? channel_data->second.event_set.event_fd().Get()
-             : -1;
+  if (channel_data != channels_.end())
+    handle = channel_data->second.event_set.event_fd().Borrow();
+  return handle;
+}
+
+int32_t Endpoint::GetChannelId(const BorrowedHandle& channel_fd) {
+  std::lock_guard<std::mutex> autolock(channel_mutex_);
+  auto iter = channel_fd_to_id_.find(channel_fd.Get());
+  return (iter != channel_fd_to_id_.end()) ? iter->second : -1;
 }
 
-Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
-                                                Message* message) {
+Status<void> Endpoint::ReceiveMessageForChannel(
+    const BorrowedHandle& channel_fd, Message* message) {
   RequestHeader<LocalHandle> request;
-  auto status = ReceiveData(channel_id, &request);
+  int32_t channel_id = GetChannelId(channel_fd);
+  auto status = ReceiveData(channel_fd.Borrow(), &request);
   if (!status) {
     if (status.error() == ESHUTDOWN) {
       BuildCloseMessage(channel_id, message);
@@ -434,12 +457,12 @@ Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
   state->request = std::move(request);
   if (request.send_len > 0 && !request.is_impulse) {
     state->request_data.resize(request.send_len);
-    status = ReceiveData(channel_id, state->request_data.data(),
+    status = ReceiveData(channel_fd, state->request_data.data(),
                          state->request_data.size());
   }
 
   if (status && request.is_impulse)
-    status = ReenableEpollEvent(channel_id);
+    status = ReenableEpollEvent(channel_fd);
 
   if (!status) {
     if (status.error() == ESHUTDOWN) {
@@ -454,7 +477,7 @@ Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
   return status;
 }
 
-void Endpoint::BuildCloseMessage(int channel_id, Message* message) {
+void Endpoint::BuildCloseMessage(int32_t channel_id, Message* message) {
   ALOGD_IF(TRACE, "Endpoint::BuildCloseMessage: channel_id=%d", channel_id);
   MessageInfo info;
   info.pid = -1;
@@ -496,22 +519,22 @@ Status<void> Endpoint::MessageReceive(Message* message) {
     auto status = AcceptConnection(message);
     if (!status)
       return status;
-    return ReenableEpollEvent(socket_fd_.Get());
+    return ReenableEpollEvent(socket_fd_.Borrow());
   }
 
-  int channel_id = event.data.fd;
+  BorrowedHandle channel_fd{event.data.fd};
   if (event.events & (EPOLLRDHUP | EPOLLHUP)) {
-    BuildCloseMessage(channel_id, message);
+    BuildCloseMessage(GetChannelId(channel_fd), message);
     return {};
   }
 
-  return ReceiveMessageForChannel(channel_id, message);
+  return ReceiveMessageForChannel(channel_fd, message);
 }
 
 Status<void> Endpoint::MessageReply(Message* message, int return_code) {
-  const int channel_id = message->GetChannelId();
-  const int channel_socket = GetChannelSocketFd(channel_id);
-  if (channel_socket < 0)
+  const int32_t channel_id = message->GetChannelId();
+  auto channel_socket = GetChannelSocketFd(channel_id);
+  if (!channel_socket)
     return ErrorStatus{EBADF};
 
   auto* state = static_cast<MessageState*>(message->GetState());
@@ -524,8 +547,7 @@ Status<void> Endpoint::MessageReply(Message* message, int return_code) {
         return CloseChannel(channel_id);
       } else {
         // Reply with the event fd.
-        auto push_status = state->PushFileHandle(
-            BorrowedHandle{GetChannelEventFd(channel_socket)});
+        auto push_status = state->PushFileHandle(GetChannelEventFd(channel_id));
         state->response_data.clear();  // Just in case...
         if (!push_status)
           return push_status.error_status();