1 #include "producer_queue_channel.h"
5 #include "consumer_queue_channel.h"
6 #include "producer_channel.h"
8 using android::pdx::ErrorStatus;
9 using android::pdx::Message;
10 using android::pdx::RemoteChannelHandle;
11 using android::pdx::Status;
12 using android::pdx::rpc::DispatchRemoteMethod;
17 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
19 const ProducerQueueConfig& config,
20 const UsagePolicy& usage_policy,
22 : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
24 usage_policy_(usage_policy),
29 ProducerQueueChannel::~ProducerQueueChannel() {
30 ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
32 for (auto* consumer : consumer_channels_)
33 consumer->OnProducerClosed();
37 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
38 BufferHubService* service, int channel_id,
39 const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
40 // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
41 // should be mutually exclusive.
42 if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
44 "BufferHubService::OnCreateProducerQueue: illegal usage mask "
45 "configuration: usage_deny_set_mask=%" PRIx64
46 " usage_deny_clear_mask=%" PRIx64,
47 usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
48 return ErrorStatus(EINVAL);
52 std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
53 service, channel_id, config, usage_policy, &error));
55 return ErrorStatus(-error);
57 return {std::move(producer)};
60 bool ProducerQueueChannel::HandleMessage(Message& message) {
61 ATRACE_NAME("ProducerQueueChannel::HandleMessage");
62 switch (message.GetOp()) {
63 case BufferHubRPC::CreateConsumerQueue::Opcode:
64 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
65 *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
68 case BufferHubRPC::GetQueueInfo::Opcode:
69 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
70 *this, &ProducerQueueChannel::OnGetQueueInfo, message);
73 case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
74 DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
75 *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
79 case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
80 DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
81 *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
89 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
90 ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
93 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
94 return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
98 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
99 Message& message, bool silent) {
100 ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
103 "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
104 channel_id(), silent);
107 auto status = message.PushChannel(0, nullptr, &channel_id);
110 "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
112 status.GetErrorMessage().c_str());
113 return ErrorStatus(ENOMEM);
116 auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
117 service(), buffer_id(), channel_id, shared_from_this(), silent);
119 // Register the existing buffers with the new consumer queue.
120 for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
121 if (auto buffer = buffers_[slot].lock())
122 consumer_queue_channel->RegisterNewBuffer(buffer, slot);
125 const auto channel_status =
126 service()->SetChannel(channel_id, consumer_queue_channel);
127 if (!channel_status) {
129 "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
131 channel_status.GetErrorMessage().c_str());
132 return ErrorStatus(ENOMEM);
135 return {status.take()};
138 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
139 return {{config_, buffer_id()}};
142 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
143 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
144 Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
145 uint32_t format, uint64_t usage, size_t buffer_count) {
146 ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
148 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
149 "producer_channel_id=%d",
152 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
154 // Deny buffer allocation violating preset rules.
155 if (usage & usage_policy_.usage_deny_set_mask) {
157 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
158 " is not permitted. Violating usage_deny_set_mask, the following bits "
159 "shall not be set: %" PRIx64 ".",
160 usage, usage_policy_.usage_deny_set_mask);
161 return ErrorStatus(EINVAL);
164 if (~usage & usage_policy_.usage_deny_clear_mask) {
166 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
167 " is not permitted. Violating usage_deny_clear_mask, the following "
168 "bits must be set: %" PRIx64 ".",
169 usage, usage_policy_.usage_deny_clear_mask);
170 return ErrorStatus(EINVAL);
173 // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
174 // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
175 uint64_t effective_usage =
176 (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
178 for (size_t i = 0; i < buffer_count; i++) {
179 auto status = AllocateBuffer(message, width, height, layer_count, format,
183 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
184 "allocate new buffer.");
185 return ErrorStatus(status.error());
187 buffer_handles.push_back(status.take());
190 return {std::move(buffer_handles)};
193 Status<std::pair<RemoteChannelHandle, size_t>>
194 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
195 uint32_t height, uint32_t layer_count,
196 uint32_t format, uint64_t usage) {
197 ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
199 "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
202 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
203 ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
204 return ErrorStatus(E2BIG);
207 // Here we are creating a new BufferHubBuffer, initialize the producer
208 // channel, and returning its file handle back to the client.
209 // buffer_id is the id of the producer channel of BufferHubBuffer.
211 auto status = message.PushChannel(0, nullptr, &buffer_id);
214 ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
215 status.GetErrorMessage().c_str());
216 return ErrorStatus(status.error());
220 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
221 "height=%u layer_count=%u format=%u usage=%" PRIx64,
222 buffer_id, width, height, layer_count, format, usage);
223 auto buffer_handle = status.take();
225 auto producer_channel_status =
226 ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
227 format, usage, config_.user_metadata_size);
228 if (!producer_channel_status) {
230 "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
232 producer_channel_status.GetErrorMessage().c_str());
233 return ErrorStatus(ENOMEM);
235 auto producer_channel = producer_channel_status.take();
239 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
240 buffer_id, buffer_handle.value());
242 const auto channel_status =
243 service()->SetChannel(buffer_id, producer_channel);
244 if (!channel_status) {
246 "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
247 "for new BufferHubBuffer: %s",
248 channel_status.GetErrorMessage().c_str());
249 return ErrorStatus(ENOMEM);
252 // Register the newly allocated buffer's channel_id into the first empty
255 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
256 if (buffers_[slot].expired())
259 if (slot == BufferHubRPC::kMaxQueueCapacity) {
261 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
262 "buffer allocation.");
263 return ErrorStatus(E2BIG);
266 buffers_[slot] = producer_channel;
269 // Notify each consumer channel about the new buffer.
270 for (auto* consumer_channel : consumer_channels_) {
272 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
273 "buffer, buffer_id=%d",
275 consumer_channel->RegisterNewBuffer(producer_channel, slot);
278 return {{std::move(buffer_handle), slot}};
281 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
282 Message& /*message*/, size_t slot) {
283 if (buffers_[slot].expired()) {
285 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
286 "an invalid buffer producer at slot %zu",
288 return ErrorStatus(EINVAL);
291 if (capacity_ == 0) {
293 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
294 "buffer producer while the queue's capacity is already zero.");
295 return ErrorStatus(EINVAL);
298 buffers_[slot].reset();
303 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
304 consumer_channels_.push_back(channel);
307 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
308 consumer_channels_.erase(
309 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
313 } // namespace android