ResponseHeader<LocalHandle> response;
};
-Status<void> ReadAndDiscardData(int socket_fd, size_t size) {
+Status<void> ReadAndDiscardData(const BorrowedHandle& socket_fd, size_t size) {
while (size > 0) {
// If there is more data to read in the message than the buffers provided
// by the caller, read and discard the extra data from the socket.
return ErrorStatus(EIO);
}
-Status<void> SendRequest(int socket_fd, TransactionState* transaction_state,
- int opcode, const iovec* send_vector,
- size_t send_count, size_t max_recv_len) {
+Status<void> SendRequest(const BorrowedHandle& socket_fd,
+ TransactionState* transaction_state, int opcode,
+ const iovec* send_vector, size_t send_count,
+ size_t max_recv_len) {
size_t send_len = CountVectorSize(send_vector, send_count);
InitRequest(&transaction_state->request, opcode, send_len, max_recv_len,
false);
return status;
}
-Status<void> ReceiveResponse(int socket_fd, TransactionState* transaction_state,
+Status<void> ReceiveResponse(const BorrowedHandle& socket_fd,
+ TransactionState* transaction_state,
const iovec* receive_vector, size_t receive_count,
size_t max_recv_len) {
auto status = ReceiveData(socket_fd, &transaction_state->response);
InitRequest(&request, opcode, length, 0, true);
memcpy(request.impulse_payload.data(), buffer, length);
- return SendData(channel_handle_.value(), request);
+ return SendData(BorrowedHandle{channel_handle_.value()}, request);
}
Status<int> ClientChannel::SendAndReceive(void* transaction_state, int opcode,
auto* state = static_cast<TransactionState*>(transaction_state);
size_t max_recv_len = CountVectorSize(receive_vector, receive_count);
- auto status = SendRequest(channel_handle_.value(), state, opcode, send_vector,
- send_count, max_recv_len);
+ auto status = SendRequest(BorrowedHandle{channel_handle_.value()}, state,
+ opcode, send_vector, send_count, max_recv_len);
if (status) {
- status = ReceiveResponse(channel_handle_.value(), state, receive_vector,
- receive_count, max_recv_len);
+ status = ReceiveResponse(BorrowedHandle{channel_handle_.value()}, state,
+ receive_vector, receive_count, max_recv_len);
}
if (!result.PropagateError(status)) {
const int return_code = state->response.ret_code;
remote.sun_path);
RequestHeader<BorrowedHandle> request;
InitRequest(&request, opcodes::CHANNEL_OPEN, 0, 0, false);
- status = SendData(socket_fd.Get(), request);
+ status = SendData(socket_fd.Borrow(), request);
if (!status)
return ErrorStatus(status.error());
ResponseHeader<LocalHandle> response;
- status = ReceiveData(socket_fd.Get(), &response);
+ status = ReceiveData(socket_fd.Borrow(), &response);
if (!status)
return ErrorStatus(status.error());
int ref = response.ret_code;
uint32_t fd_count{0};
};
-Status<void> SendPayload::Send(int socket_fd) {
+Status<void> SendPayload::Send(const BorrowedHandle& socket_fd) {
return Send(socket_fd, nullptr);
}
-Status<void> SendPayload::Send(int socket_fd, const ucred* cred) {
+Status<void> SendPayload::Send(const BorrowedHandle& socket_fd,
+ const ucred* cred) {
MessagePreamble preamble;
preamble.magic = kMagicPreamble;
preamble.data_size = buffer_.size();
preamble.fd_count = file_handles_.size();
- ssize_t ret =
- RETRY_EINTR(send(socket_fd, &preamble, sizeof(preamble), MSG_NOSIGNAL));
+ ssize_t ret = RETRY_EINTR(
+ send(socket_fd.Get(), &preamble, sizeof(preamble), MSG_NOSIGNAL));
if (ret < 0)
return ErrorStatus(errno);
if (ret != sizeof(preamble))
}
}
- ret = RETRY_EINTR(sendmsg(socket_fd, &msg, MSG_NOSIGNAL));
+ ret = RETRY_EINTR(sendmsg(socket_fd.Get(), &msg, MSG_NOSIGNAL));
if (ret < 0)
return ErrorStatus(errno);
if (static_cast<size_t>(ret) != buffer_.size())
return ErrorStatus{EOPNOTSUPP};
}
-Status<void> ReceivePayload::Receive(int socket_fd) {
+Status<void> ReceivePayload::Receive(const BorrowedHandle& socket_fd) {
return Receive(socket_fd, nullptr);
}
-Status<void> ReceivePayload::Receive(int socket_fd, ucred* cred) {
+Status<void> ReceivePayload::Receive(const BorrowedHandle& socket_fd,
+ ucred* cred) {
MessagePreamble preamble;
- ssize_t ret =
- RETRY_EINTR(recv(socket_fd, &preamble, sizeof(preamble), MSG_WAITALL));
+ ssize_t ret = RETRY_EINTR(
+ recv(socket_fd.Get(), &preamble, sizeof(preamble), MSG_WAITALL));
if (ret < 0)
return ErrorStatus(errno);
else if (ret == 0)
msg.msg_control = alloca(msg.msg_controllen);
}
- ret = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
+ ret = RETRY_EINTR(recvmsg(socket_fd.Get(), &msg, MSG_WAITALL));
if (ret < 0)
return ErrorStatus(errno);
else if (ret == 0)
return false;
}
-Status<void> SendData(int socket_fd, const void* data, size_t size) {
- ssize_t size_written = RETRY_EINTR(send(socket_fd, data, size, MSG_NOSIGNAL));
+Status<void> SendData(const BorrowedHandle& socket_fd, const void* data,
+ size_t size) {
+ ssize_t size_written =
+ RETRY_EINTR(send(socket_fd.Get(), data, size, MSG_NOSIGNAL));
if (size_written < 0)
return ErrorStatus(errno);
if (static_cast<size_t>(size_written) != size)
return {};
}
-Status<void> SendDataVector(int socket_fd, const iovec* data, size_t count) {
+Status<void> SendDataVector(const BorrowedHandle& socket_fd, const iovec* data,
+ size_t count) {
msghdr msg = {};
msg.msg_iov = const_cast<iovec*>(data);
msg.msg_iovlen = count;
- ssize_t size_written = RETRY_EINTR(sendmsg(socket_fd, &msg, MSG_NOSIGNAL));
+ ssize_t size_written =
+ RETRY_EINTR(sendmsg(socket_fd.Get(), &msg, MSG_NOSIGNAL));
if (size_written < 0)
return ErrorStatus(errno);
if (static_cast<size_t>(size_written) != CountVectorSize(data, count))
return {};
}
-Status<void> ReceiveData(int socket_fd, void* data, size_t size) {
- ssize_t size_read = RETRY_EINTR(recv(socket_fd, data, size, MSG_WAITALL));
+Status<void> ReceiveData(const BorrowedHandle& socket_fd, void* data,
+ size_t size) {
+ ssize_t size_read =
+ RETRY_EINTR(recv(socket_fd.Get(), data, size, MSG_WAITALL));
if (size_read < 0)
return ErrorStatus(errno);
else if (size_read == 0)
return {};
}
-Status<void> ReceiveDataVector(int socket_fd, const iovec* data, size_t count) {
+Status<void> ReceiveDataVector(const BorrowedHandle& socket_fd,
+ const iovec* data, size_t count) {
msghdr msg = {};
msg.msg_iov = const_cast<iovec*>(data);
msg.msg_iovlen = count;
- ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
+ ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd.Get(), &msg, MSG_WAITALL));
if (size_read < 0)
return ErrorStatus(errno);
else if (size_read == 0)
class SendPayload : public MessageWriter, public OutputResourceMapper {
public:
- Status<void> Send(int socket_fd);
- Status<void> Send(int socket_fd, const ucred* cred);
+ Status<void> Send(const BorrowedHandle& socket_fd);
+ Status<void> Send(const BorrowedHandle& socket_fd, const ucred* cred);
// MessageWriter
void* GetNextWriteBufferSection(size_t size) override;
class ReceivePayload : public MessageReader, public InputResourceMapper {
public:
- Status<void> Receive(int socket_fd);
- Status<void> Receive(int socket_fd, ucred* cred);
+ Status<void> Receive(const BorrowedHandle& socket_fd);
+ Status<void> Receive(const BorrowedHandle& socket_fd, ucred* cred);
// MessageReader
BufferSection GetNextReadBufferSection() override;
};
template <typename T>
-inline Status<void> SendData(int socket_fd, const T& data) {
+inline Status<void> SendData(const BorrowedHandle& socket_fd, const T& data) {
SendPayload payload;
rpc::Serialize(data, &payload);
return payload.Send(socket_fd);
}
template <typename FileHandleType>
-inline Status<void> SendData(int socket_fd,
+inline Status<void> SendData(const BorrowedHandle& socket_fd,
const RequestHeader<FileHandleType>& request) {
SendPayload payload;
rpc::Serialize(request, &payload);
return payload.Send(socket_fd, &request.cred);
}
-Status<void> SendData(int socket_fd, const void* data, size_t size);
-Status<void> SendDataVector(int socket_fd, const iovec* data, size_t count);
+Status<void> SendData(const BorrowedHandle& socket_fd, const void* data,
+ size_t size);
+Status<void> SendDataVector(const BorrowedHandle& socket_fd, const iovec* data,
+ size_t count);
template <typename T>
-inline Status<void> ReceiveData(int socket_fd, T* data) {
+inline Status<void> ReceiveData(const BorrowedHandle& socket_fd, T* data) {
ReceivePayload payload;
Status<void> status = payload.Receive(socket_fd);
if (status && rpc::Deserialize(data, &payload) != rpc::ErrorCode::NO_ERROR)
}
template <typename FileHandleType>
-inline Status<void> ReceiveData(int socket_fd,
+inline Status<void> ReceiveData(const BorrowedHandle& socket_fd,
RequestHeader<FileHandleType>* request) {
ReceivePayload payload;
Status<void> status = payload.Receive(socket_fd, &request->cred);
return status;
}
-Status<void> ReceiveData(int socket_fd, void* data, size_t size);
-Status<void> ReceiveDataVector(int socket_fd, const iovec* data, size_t count);
+Status<void> ReceiveData(const BorrowedHandle& socket_fd, void* data,
+ size_t size);
+Status<void> ReceiveDataVector(const BorrowedHandle& socket_fd,
+ const iovec* data, size_t count);
size_t CountVectorSize(const iovec* data, size_t count);
void InitRequest(android::pdx::uds::RequestHeader<BorrowedHandle>* request,
return next_message_id_.fetch_add(1, std::memory_order_relaxed);
}
- void BuildCloseMessage(int channel_id, Message* message);
+ void BuildCloseMessage(int32_t channel_id, Message* message);
Status<void> AcceptConnection(Message* message);
- Status<void> ReceiveMessageForChannel(int channel_id, Message* message);
+ Status<void> ReceiveMessageForChannel(const BorrowedHandle& channel_fd,
+ Message* message);
Status<void> OnNewChannel(LocalHandle channel_fd);
- Status<ChannelData*> OnNewChannelLocked(LocalHandle channel_fd,
- Channel* channel_state);
- Status<void> CloseChannelLocked(int channel_id);
- Status<void> ReenableEpollEvent(int fd);
- Channel* GetChannelState(int channel_id);
- int GetChannelSocketFd(int channel_id);
- int GetChannelEventFd(int channel_id);
+ Status<std::pair<int32_t, ChannelData*>> OnNewChannelLocked(
+ LocalHandle channel_fd, Channel* channel_state);
+ Status<void> CloseChannelLocked(int32_t channel_id);
+ Status<void> ReenableEpollEvent(const BorrowedHandle& channel_fd);
+ Channel* GetChannelState(int32_t channel_id);
+ BorrowedHandle GetChannelSocketFd(int32_t channel_id);
+ BorrowedHandle GetChannelEventFd(int32_t channel_id);
+ int32_t GetChannelId(const BorrowedHandle& channel_fd);
std::string endpoint_path_;
bool is_blocking_;
LocalHandle epoll_fd_;
mutable std::mutex channel_mutex_;
- std::map<int, ChannelData> channels_;
+ std::map<int32_t, ChannelData> channels_;
+ std::map<int, int32_t> channel_fd_to_id_;
+ int32_t last_channel_id_{0};
Service* service_{nullptr};
std::atomic<uint32_t> next_message_id_;
return ErrorStatus(errno);
}
- auto status = ReceiveMessageForChannel(channel_fd.Get(), message);
+ // Borrow the channel handle before we pass (move) it into OnNewChannel().
+ BorrowedHandle borrowed_channel_handle = channel_fd.Borrow();
+ auto status = OnNewChannel(std::move(channel_fd));
if (status)
- status = OnNewChannel(std::move(channel_fd));
+ status = ReceiveMessageForChannel(borrowed_channel_handle, message);
return status;
}
return status;
}
-Status<Endpoint::ChannelData*> Endpoint::OnNewChannelLocked(
+Status<std::pair<int32_t, Endpoint::ChannelData*>> Endpoint::OnNewChannelLocked(
LocalHandle channel_fd, Channel* channel_state) {
epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
return ErrorStatus(errno);
}
ChannelData channel_data;
- const int channel_id = channel_fd.Get();
channel_data.event_set.AddDataFd(channel_fd);
channel_data.data_fd = std::move(channel_fd);
channel_data.channel_state = channel_state;
- auto pair = channels_.emplace(channel_id, std::move(channel_data));
- return &pair.first->second;
+ for (;;) {
+ // Try new channel IDs until we find one which is not already in the map.
+ if (last_channel_id_++ == std::numeric_limits<int32_t>::max())
+ last_channel_id_ = 1;
+ auto iter = channels_.lower_bound(last_channel_id_);
+ if (iter == channels_.end() || iter->first != last_channel_id_) {
+ channel_fd_to_id_.emplace(channel_data.data_fd.Get(), last_channel_id_);
+ iter = channels_.emplace_hint(iter, last_channel_id_,
+ std::move(channel_data));
+ return std::make_pair(last_channel_id_, &iter->second);
+ }
+ }
}
-Status<void> Endpoint::ReenableEpollEvent(int fd) {
+Status<void> Endpoint::ReenableEpollEvent(const BorrowedHandle& fd) {
epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
- event.data.fd = fd;
- if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, fd, &event) < 0) {
+ event.data.fd = fd.Get();
+ if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, fd.Get(), &event) < 0) {
ALOGE(
"Endpoint::ReenableEpollEvent: Failed to re-enable channel to "
"endpoint: %s\n",
return CloseChannelLocked(channel_id);
}
-Status<void> Endpoint::CloseChannelLocked(int channel_id) {
+Status<void> Endpoint::CloseChannelLocked(int32_t channel_id) {
ALOGD_IF(TRACE, "Endpoint::CloseChannelLocked: channel_id=%d", channel_id);
- auto channel_data = channels_.find(channel_id);
- if (channel_data == channels_.end())
+ auto iter = channels_.find(channel_id);
+ if (iter == channels_.end())
return ErrorStatus{EINVAL};
+ int channel_fd = iter->second.data_fd.Get();
Status<void> status;
epoll_event dummy; // See BUGS in man 2 epoll_ctl.
- if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, channel_id, &dummy) < 0) {
+ if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, channel_fd, &dummy) < 0) {
status.SetError(errno);
ALOGE(
"Endpoint::CloseChannelLocked: Failed to remove channel from endpoint: "
status.SetValue();
}
- channels_.erase(channel_data);
+ channel_fd_to_id_.erase(channel_fd);
+ channels_.erase(iter);
return status;
}
}
std::lock_guard<std::mutex> autolock(channel_mutex_);
- *channel_id = local_socket.Get();
auto channel_data = OnNewChannelLocked(std::move(local_socket), channel);
if (!channel_data)
return channel_data.error_status();
+ *channel_id = channel_data.get().first;
// Flags are ignored for now.
// TODO(xiaohuit): Implement those.
auto* state = static_cast<MessageState*>(message->GetState());
Status<ChannelReference> ref = state->PushChannelHandle(
remote_socket.Borrow(),
- channel_data.get()->event_set.event_fd().Borrow());
+ channel_data.get().second->event_set.event_fd().Borrow());
if (!ref)
return ref.error_status();
state->sockets_to_close.push_back(std::move(remote_socket));
return ErrorStatus(EFAULT);
}
-Channel* Endpoint::GetChannelState(int channel_id) {
+Channel* Endpoint::GetChannelState(int32_t channel_id) {
std::lock_guard<std::mutex> autolock(channel_mutex_);
auto channel_data = channels_.find(channel_id);
return (channel_data != channels_.end()) ? channel_data->second.channel_state
: nullptr;
}
-int Endpoint::GetChannelSocketFd(int channel_id) {
+BorrowedHandle Endpoint::GetChannelSocketFd(int32_t channel_id) {
std::lock_guard<std::mutex> autolock(channel_mutex_);
+ BorrowedHandle handle;
auto channel_data = channels_.find(channel_id);
- return (channel_data != channels_.end()) ? channel_data->second.data_fd.Get()
- : -1;
+ if (channel_data != channels_.end())
+ handle = channel_data->second.data_fd.Borrow();
+ return handle;
}
-int Endpoint::GetChannelEventFd(int channel_id) {
+BorrowedHandle Endpoint::GetChannelEventFd(int32_t channel_id) {
std::lock_guard<std::mutex> autolock(channel_mutex_);
+ BorrowedHandle handle;
auto channel_data = channels_.find(channel_id);
- return (channel_data != channels_.end())
- ? channel_data->second.event_set.event_fd().Get()
- : -1;
+ if (channel_data != channels_.end())
+ handle = channel_data->second.event_set.event_fd().Borrow();
+ return handle;
+}
+
+int32_t Endpoint::GetChannelId(const BorrowedHandle& channel_fd) {
+ std::lock_guard<std::mutex> autolock(channel_mutex_);
+ auto iter = channel_fd_to_id_.find(channel_fd.Get());
+ return (iter != channel_fd_to_id_.end()) ? iter->second : -1;
}
-Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
- Message* message) {
+Status<void> Endpoint::ReceiveMessageForChannel(
+ const BorrowedHandle& channel_fd, Message* message) {
RequestHeader<LocalHandle> request;
- auto status = ReceiveData(channel_id, &request);
+ int32_t channel_id = GetChannelId(channel_fd);
+ auto status = ReceiveData(channel_fd.Borrow(), &request);
if (!status) {
if (status.error() == ESHUTDOWN) {
BuildCloseMessage(channel_id, message);
state->request = std::move(request);
if (request.send_len > 0 && !request.is_impulse) {
state->request_data.resize(request.send_len);
- status = ReceiveData(channel_id, state->request_data.data(),
+ status = ReceiveData(channel_fd, state->request_data.data(),
state->request_data.size());
}
if (status && request.is_impulse)
- status = ReenableEpollEvent(channel_id);
+ status = ReenableEpollEvent(channel_fd);
if (!status) {
if (status.error() == ESHUTDOWN) {
return status;
}
-void Endpoint::BuildCloseMessage(int channel_id, Message* message) {
+void Endpoint::BuildCloseMessage(int32_t channel_id, Message* message) {
ALOGD_IF(TRACE, "Endpoint::BuildCloseMessage: channel_id=%d", channel_id);
MessageInfo info;
info.pid = -1;
auto status = AcceptConnection(message);
if (!status)
return status;
- return ReenableEpollEvent(socket_fd_.Get());
+ return ReenableEpollEvent(socket_fd_.Borrow());
}
- int channel_id = event.data.fd;
+ BorrowedHandle channel_fd{event.data.fd};
if (event.events & (EPOLLRDHUP | EPOLLHUP)) {
- BuildCloseMessage(channel_id, message);
+ BuildCloseMessage(GetChannelId(channel_fd), message);
return {};
}
- return ReceiveMessageForChannel(channel_id, message);
+ return ReceiveMessageForChannel(channel_fd, message);
}
Status<void> Endpoint::MessageReply(Message* message, int return_code) {
- const int channel_id = message->GetChannelId();
- const int channel_socket = GetChannelSocketFd(channel_id);
- if (channel_socket < 0)
+ const int32_t channel_id = message->GetChannelId();
+ auto channel_socket = GetChannelSocketFd(channel_id);
+ if (!channel_socket)
return ErrorStatus{EBADF};
auto* state = static_cast<MessageState*>(message->GetState());
return CloseChannel(channel_id);
} else {
// Reply with the event fd.
- auto push_status = state->PushFileHandle(
- BorrowedHandle{GetChannelEventFd(channel_socket)});
+ auto push_status = state->PushFileHandle(GetChannelEventFd(channel_id));
state->response_data.clear(); // Just in case...
if (!push_status)
return push_status.error_status();