// may be dropped. It is possible to wait for the buffers to be
// returned (but not implemented)
+#define DEBUG_PENDING_BUFFERS 0
+
class SurfaceMediaSource : public MediaSource,
public MediaBufferObserver,
protected BufferQueue::ConsumerListener {
size_t mNumPendingBuffers;
+#if DEBUG_PENDING_BUFFERS
+ Vector<MediaBuffer *> mPendingBuffers;
+#endif
+
// mCurrentTimestamp is the timestamp for the current texture. It
// gets set to mLastQueuedTimestamp each time updateTexImage is called.
int64_t mCurrentTimestamp;
// by this "mediaBuffer" object. Now that the OMX component has
// told us that it's done with the input buffer, we can decrement
// the mediaBuffer's reference count.
+
+ ALOGV("releasing mbuf %p", mediaBuffer);
+
((MediaBuffer *)mediaBuffer)->release();
mediaBuffer = NULL;
ALOGV("stop");
Mutex::Autolock lock(mMutex);
+ if (mStopped) {
+ return OK;
+ }
+
+ while (mNumPendingBuffers > 0) {
+ ALOGI("Still waiting for %d buffers to be returned.",
+ mNumPendingBuffers);
+
+#if DEBUG_PENDING_BUFFERS
+ for (size_t i = 0; i < mPendingBuffers.size(); ++i) {
+ ALOGI("%d: %p", i, mPendingBuffers.itemAt(i));
+ }
+#endif
+
+ mMediaBuffersAvailableCondition.wait(mMutex);
+ }
+
mStopped = true;
mFrameAvailableCondition.signal();
mMediaBuffersAvailableCondition.signal();
++mNumPendingBuffers;
+#if DEBUG_PENDING_BUFFERS
+ mPendingBuffers.push_back(*buffer);
+#endif
+
+ ALOGV("returning mbuf %p", *buffer);
+
return OK;
}
CHECK(!"signalBufferReturned: bogus buffer");
}
+#if DEBUG_PENDING_BUFFERS
+ for (size_t i = 0; i < mPendingBuffers.size(); ++i) {
+ if (mPendingBuffers.itemAt(i) == buffer) {
+ mPendingBuffers.removeAt(i);
+ break;
+ }
+ }
+#endif
+
--mNumPendingBuffers;
mMediaBuffersAvailableCondition.broadcast();
}
}
ANetworkSession::Session::~Session() {
- ALOGI("Session %d gone", mSessionID);
+ ALOGV("Session %d gone", mSessionID);
close(mSocket);
mSocket = -1;
} else {
status_t err = session->readMore();
if (err != OK) {
- ALOGI("readMore on socket %d failed w/ error %d (%s)",
+ ALOGE("readMore on socket %d failed w/ error %d (%s)",
s, err, strerror(-err));
}
}
if (FD_ISSET(s, &ws)) {
status_t err = session->writeMore();
if (err != OK) {
- ALOGI("writeMore on socket %d failed w/ error %d (%s)",
+ ALOGE("writeMore on socket %d failed w/ error %d (%s)",
s, err, strerror(-err));
}
}
#include "Converter.h"
+#include "MediaPuller.h"
+
#include <cutils/properties.h>
#include <gui/SurfaceTextureClient.h>
#include <media/ICrypto.h>
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/MediaBuffer.h>
#include <media/stagefright/MediaCodec.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/MediaErrors.h>
}
Converter::~Converter() {
- if (mEncoder != NULL) {
- mEncoder->release();
- mEncoder.clear();
- }
+ CHECK(mEncoder == NULL);
+}
- AString mime;
- CHECK(mInputFormat->findString("mime", &mime));
- ALOGI("encoder (%s) shut down.", mime.c_str());
+void Converter::shutdownAsync() {
+ ALOGV("shutdown");
+ (new AMessage(kWhatShutdown, id()))->post();
}
status_t Converter::initCheck() const {
return mEncoder->getOutputBuffers(&mEncoderOutputBuffers);
}
-void Converter::feedAccessUnit(const sp<ABuffer> &accessUnit) {
- sp<AMessage> msg = new AMessage(kWhatFeedAccessUnit, id());
- msg->setBuffer("accessUnit", accessUnit);
- msg->post();
-}
-
-void Converter::signalEOS() {
- (new AMessage(kWhatInputEOS, id()))->post();
-}
-
void Converter::notifyError(status_t err) {
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
void Converter::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
- case kWhatFeedAccessUnit:
+ case kWhatMediaPullerNotify:
{
- sp<ABuffer> accessUnit;
- CHECK(msg->findBuffer("accessUnit", &accessUnit));
+ int32_t what;
+ CHECK(msg->findInt32("what", &what));
- mInputBufferQueue.push_back(accessUnit);
+ if (mEncoder == NULL) {
+ ALOGV("got msg '%s' after encoder shutdown.",
+ msg->debugString().c_str());
- feedEncoderInputBuffers();
+ if (what == MediaPuller::kWhatAccessUnit) {
+ sp<ABuffer> accessUnit;
+ CHECK(msg->findBuffer("accessUnit", &accessUnit));
- scheduleDoMoreWork();
- break;
- }
+ void *mbuf;
+ if (accessUnit->meta()->findPointer("mediaBuffer", &mbuf)
+ && mbuf != NULL) {
+ ALOGV("releasing mbuf %p", mbuf);
- case kWhatInputEOS:
- {
- mInputBufferQueue.push_back(NULL);
+ accessUnit->meta()->setPointer("mediaBuffer", NULL);
+
+ static_cast<MediaBuffer *>(mbuf)->release();
+ mbuf = NULL;
+ }
+ }
+ break;
+ }
+
+ if (what == MediaPuller::kWhatEOS) {
+ mInputBufferQueue.push_back(NULL);
+
+ feedEncoderInputBuffers();
+
+ scheduleDoMoreWork();
+ } else {
+ CHECK_EQ(what, MediaPuller::kWhatAccessUnit);
+
+ sp<ABuffer> accessUnit;
+ CHECK(msg->findBuffer("accessUnit", &accessUnit));
+
+#if 0
+ void *mbuf;
+ if (accessUnit->meta()->findPointer("mediaBuffer", &mbuf)
+ && mbuf != NULL) {
+ ALOGI("queueing mbuf %p", mbuf);
+ }
+#endif
+
+ mInputBufferQueue.push_back(accessUnit);
- feedEncoderInputBuffers();
+ feedEncoderInputBuffers();
- scheduleDoMoreWork();
+ scheduleDoMoreWork();
+ }
break;
}
case kWhatDoMoreWork:
{
mDoMoreWorkPending = false;
+
+ if (mEncoder == NULL) {
+ break;
+ }
+
status_t err = doMoreWork();
if (err != OK) {
case kWhatRequestIDRFrame:
{
+ if (mEncoder == NULL) {
+ break;
+ }
+
if (mIsVideo) {
ALOGI("requesting IDR frame");
mEncoder->requestIDRFrame();
break;
}
+ case kWhatShutdown:
+ {
+ ALOGI("shutting down encoder");
+ mEncoder->release();
+ mEncoder.clear();
+
+ AString mime;
+ CHECK(mInputFormat->findString("mime", &mime));
+ ALOGI("encoder (%s) shut down.", mime.c_str());
+ break;
+ }
+
default:
TRESPASS();
}
kWhatError,
};
-protected:
- virtual ~Converter();
- virtual void onMessageReceived(const sp<AMessage> &msg);
-
-private:
enum {
- kWhatFeedAccessUnit,
- kWhatInputEOS,
kWhatDoMoreWork,
kWhatRequestIDRFrame,
+ kWhatShutdown,
+ kWhatMediaPullerNotify,
};
+ void shutdownAsync();
+
+protected:
+ virtual ~Converter();
+ virtual void onMessageReceived(const sp<AMessage> &msg);
+
+private:
status_t mInitCheck;
sp<AMessage> mNotify;
sp<ALooper> mCodecLooper;
return postSynchronouslyAndReturnError(new AMessage(kWhatStart, id()));
}
-status_t MediaPuller::stop() {
- return postSynchronouslyAndReturnError(new AMessage(kWhatStop, id()));
+void MediaPuller::stopAsync(const sp<AMessage> ¬ify) {
+ sp<AMessage> msg = new AMessage(kWhatStop, id());
+ msg->setMessage("notify", notify);
+ msg->post();
}
void MediaPuller::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatStart:
- case kWhatStop:
{
- status_t err;
+ status_t err = mSource->start();
- if (msg->what() == kWhatStart) {
- err = mSource->start();
-
- if (err == OK) {
- schedulePull();
- }
- } else {
- sp<MetaData> meta = mSource->getFormat();
- const char *tmp;
- CHECK(meta->findCString(kKeyMIMEType, &tmp));
- AString mime = tmp;
-
- ALOGI("MediaPuller(%s) stopping.", mime.c_str());
- err = mSource->stop();
- ALOGI("MediaPuller(%s) stopped.", mime.c_str());
- ++mPullGeneration;
+ if (err == OK) {
+ schedulePull();
}
sp<AMessage> response = new AMessage;
break;
}
+ case kWhatStop:
+ {
+ sp<MetaData> meta = mSource->getFormat();
+ const char *tmp;
+ CHECK(meta->findCString(kKeyMIMEType, &tmp));
+ AString mime = tmp;
+
+ ALOGI("MediaPuller(%s) stopping.", mime.c_str());
+ mSource->stop();
+ ALOGI("MediaPuller(%s) stopped.", mime.c_str());
+ ++mPullGeneration;
+
+ sp<AMessage> notify;
+ CHECK(msg->findMessage("notify", ¬ify));
+ notify->post();
+ break;
+ }
+
case kWhatPull:
{
int32_t generation;
notify->setBuffer("accessUnit", accessUnit);
notify->post();
+ if (mbuf != NULL) {
+ ALOGV("posted mbuf %p", mbuf);
+ }
+
schedulePull();
}
break;
MediaPuller(const sp<MediaSource> &source, const sp<AMessage> ¬ify);
status_t start();
- status_t stop();
+ void stopAsync(const sp<AMessage> ¬ify);
protected:
virtual void onMessageReceived(const sp<AMessage> &msg);
static size_t kMaxRTPPacketSize = 1500;
static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188;
-struct WifiDisplaySource::PlaybackSession::Track : public RefBase {
- Track(const sp<ALooper> &pullLooper,
+struct WifiDisplaySource::PlaybackSession::Track : public AHandler {
+ enum {
+ kWhatStopped,
+ };
+
+ Track(const sp<AMessage> ¬ify,
+ const sp<ALooper> &pullLooper,
const sp<ALooper> &codecLooper,
const sp<MediaPuller> &mediaPuller,
const sp<Converter> &converter);
- Track(const sp<AMessage> &format);
-
sp<AMessage> getFormat();
bool isAudio() const;
void setPacketizerTrackIndex(size_t index);
status_t start();
- status_t stop();
+ void stopAsync();
+
+ bool isStopped() const { return !mStarted; }
void queueAccessUnit(const sp<ABuffer> &accessUnit);
sp<ABuffer> dequeueAccessUnit();
protected:
+ virtual void onMessageReceived(const sp<AMessage> &msg);
virtual ~Track();
private:
+ enum {
+ kWhatMediaPullerStopped,
+ };
+
+ sp<AMessage> mNotify;
sp<ALooper> mPullLooper;
sp<ALooper> mCodecLooper;
sp<MediaPuller> mMediaPuller;
sp<Converter> mConverter;
- sp<AMessage> mFormat;
bool mStarted;
ssize_t mPacketizerTrackIndex;
bool mIsAudio;
};
WifiDisplaySource::PlaybackSession::Track::Track(
+ const sp<AMessage> ¬ify,
const sp<ALooper> &pullLooper,
const sp<ALooper> &codecLooper,
const sp<MediaPuller> &mediaPuller,
const sp<Converter> &converter)
- : mPullLooper(pullLooper),
+ : mNotify(notify),
+ mPullLooper(pullLooper),
mCodecLooper(codecLooper),
mMediaPuller(mediaPuller),
mConverter(converter),
mIsAudio(IsAudioFormat(mConverter->getOutputFormat())) {
}
-WifiDisplaySource::PlaybackSession::Track::Track(const sp<AMessage> &format)
- : mFormat(format),
- mPacketizerTrackIndex(-1),
- mIsAudio(IsAudioFormat(mFormat)) {
-}
-
WifiDisplaySource::PlaybackSession::Track::~Track() {
- stop();
+ CHECK(!mStarted);
}
// static
}
sp<AMessage> WifiDisplaySource::PlaybackSession::Track::getFormat() {
- if (mFormat != NULL) {
- return mFormat;
- }
-
return mConverter->getOutputFormat();
}
status_t WifiDisplaySource::PlaybackSession::Track::start() {
ALOGV("Track::start isAudio=%d", mIsAudio);
- if (mStarted) {
- return INVALID_OPERATION;
- }
+ CHECK(!mStarted);
status_t err = OK;
return err;
}
-status_t WifiDisplaySource::PlaybackSession::Track::stop() {
- ALOGV("Track::stop isAudio=%d", mIsAudio);
+void WifiDisplaySource::PlaybackSession::Track::stopAsync() {
+ ALOGV("Track::stopAsync isAudio=%d", mIsAudio);
- if (!mStarted) {
- return INVALID_OPERATION;
- }
+ CHECK(mStarted);
- status_t err = OK;
+ mConverter->shutdownAsync();
+
+ sp<AMessage> msg = new AMessage(kWhatMediaPullerStopped, id());
if (mMediaPuller != NULL) {
- err = mMediaPuller->stop();
+ mMediaPuller->stopAsync(msg);
+ } else {
+ msg->post();
}
+}
- mConverter.clear();
+void WifiDisplaySource::PlaybackSession::Track::onMessageReceived(
+ const sp<AMessage> &msg) {
+ switch (msg->what()) {
+ case kWhatMediaPullerStopped:
+ {
+ mConverter.clear();
- mStarted = false;
+ mStarted = false;
- return err;
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatStopped);
+ notify->post();
+ break;
+ }
+
+ default:
+ TRESPASS();
+ }
}
void WifiDisplaySource::PlaybackSession::Track::queueAccessUnit(
}
for (size_t i = 0; i < mTracks.size(); ++i) {
- status_t err = mTracks.editValueAt(i)->start();
-
- if (err != OK) {
- for (size_t j = 0; j < i; ++j) {
- mTracks.editValueAt(j)->stop();
- }
-
- return err;
- }
+ CHECK_EQ((status_t)OK, mTracks.editValueAt(i)->start());
}
sp<AMessage> notify = mNotify->dup();
return OK;
}
-status_t WifiDisplaySource::PlaybackSession::destroy() {
- mTracks.clear();
-
- mPacketizer.clear();
-
- mTracks.clear();
-
-#if ENABLE_RETRANSMISSION
- if (mRTCPRetransmissionSessionID != 0) {
- mNetSession->destroySession(mRTCPRetransmissionSessionID);
- }
-
- if (mRTPRetransmissionSessionID != 0) {
- mNetSession->destroySession(mRTPRetransmissionSessionID);
- }
-#endif
+void WifiDisplaySource::PlaybackSession::destroyAsync() {
+ ALOGI("destroyAsync");
- if (mRTCPSessionID != 0) {
- mNetSession->destroySession(mRTCPSessionID);
- }
-
- if (mRTPSessionID != 0) {
- mNetSession->destroySession(mRTPSessionID);
+ for (size_t i = 0; i < mTracks.size(); ++i) {
+ mTracks.valueAt(i)->stopAsync();
}
-
- return OK;
}
void WifiDisplaySource::PlaybackSession::onMessageReceived(
break;
}
- case kWhatMediaPullerNotify:
- {
- int32_t what;
- CHECK(msg->findInt32("what", &what));
-
- if (what == MediaPuller::kWhatEOS) {
- ALOGI("input eos");
-
- for (size_t i = 0; i < mTracks.size(); ++i) {
- mTracks.valueAt(i)->converter()->signalEOS();
- }
- } else {
- CHECK_EQ(what, MediaPuller::kWhatAccessUnit);
-
- size_t trackIndex;
- CHECK(msg->findSize("trackIndex", &trackIndex));
-
- sp<ABuffer> accessUnit;
- CHECK(msg->findBuffer("accessUnit", &accessUnit));
-
- mTracks.valueFor(trackIndex)->converter()
- ->feedAccessUnit(accessUnit);
- }
- break;
- }
-
case kWhatConverterNotify:
{
int32_t what;
break;
}
+ case kWhatTrackNotify:
+ {
+ int32_t what;
+ CHECK(msg->findInt32("what", &what));
+
+ size_t trackIndex;
+ CHECK(msg->findSize("trackIndex", &trackIndex));
+
+ if (what == Track::kWhatStopped) {
+ bool allTracksAreStopped = true;
+ for (size_t i = 0; i < mTracks.size(); ++i) {
+ const sp<Track> &track = mTracks.valueAt(i);
+ if (!track->isStopped()) {
+ allTracksAreStopped = false;
+ break;
+ }
+ }
+
+ if (!allTracksAreStopped) {
+ break;
+ }
+
+ mTracks.clear();
+
+ mPacketizer.clear();
+
+#if ENABLE_RETRANSMISSION
+ if (mRTCPRetransmissionSessionID != 0) {
+ mNetSession->destroySession(mRTCPRetransmissionSessionID);
+ }
+
+ if (mRTPRetransmissionSessionID != 0) {
+ mNetSession->destroySession(mRTPRetransmissionSessionID);
+ }
+#endif
+
+ if (mRTCPSessionID != 0) {
+ mNetSession->destroySession(mRTCPSessionID);
+ }
+
+ if (mRTPSessionID != 0) {
+ mNetSession->destroySession(mRTPSessionID);
+ }
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatSessionDestroyed);
+ notify->post();
+ }
+ break;
+ }
+
default:
TRESPASS();
}
trackIndex = mTracks.size();
- notify = new AMessage(kWhatMediaPullerNotify, id());
- notify->setSize("trackIndex", trackIndex);
- sp<MediaPuller> puller = new MediaPuller(source, notify);
- pullLooper->registerHandler(puller);
-
sp<AMessage> format;
status_t err = convertMetaDataToMessage(source->getFormat(), &format);
CHECK_EQ(err, (status_t)OK);
looper()->registerHandler(converter);
+ notify = new AMessage(Converter::kWhatMediaPullerNotify, converter->id());
+ notify->setSize("trackIndex", trackIndex);
+
+ sp<MediaPuller> puller = new MediaPuller(source, notify);
+ pullLooper->registerHandler(puller);
+
if (numInputBuffers != NULL) {
*numInputBuffers = converter->getInputBufferCount();
}
- mTracks.add(trackIndex, new Track(pullLooper, codecLooper, puller, converter));
+ notify = new AMessage(kWhatTrackNotify, id());
+ notify->setSize("trackIndex", trackIndex);
+
+ sp<Track> track = new Track(
+ notify, pullLooper, codecLooper, puller, converter);
+
+ looper()->registerHandler(track);
+
+ mTracks.add(trackIndex, track);
if (isVideo) {
mVideoTrackIndex = trackIndex;
const char *clientIP, int32_t clientRtp, int32_t clientRtcp,
TransportMode transportMode);
- status_t destroy();
+ void destroyAsync();
int32_t getRTPPort() const;
kWhatSessionDead,
kWhatBinaryData,
kWhatSessionEstablished,
+ kWhatSessionDestroyed,
};
protected:
#endif
kWhatMediaPullerNotify,
kWhatConverterNotify,
+ kWhatTrackNotify,
kWhatUpdateSurface,
kWhatFinishPlay,
};
}
status_t RepeaterSource::stop() {
+ ALOGV("stopping");
+
if (mLooper != NULL) {
mLooper->stop();
mLooper.clear();
mReflector.clear();
}
- return mSource->stop();
+ if (mBuffer != NULL) {
+ ALOGV("releasing mbuf %p", mBuffer);
+ mBuffer->release();
+ mBuffer = NULL;
+ }
+
+ status_t err = mSource->stop();
+
+ ALOGV("stopped");
+
+ return err;
}
sp<MetaData> RepeaterSource::getFormat() {
MediaBuffer *buffer;
status_t err = mSource->read(&buffer);
+ ALOGV("read mbuf %p", buffer);
+
Mutex::Autolock autoLock(mLock);
if (mBuffer != NULL) {
mBuffer->release();
mClientInfo.mPlaybackSession->height(),
0 /* flags */);
}
+ } else if (what == PlaybackSession::kWhatSessionDestroyed) {
+ disconnectClient2();
} else {
CHECK_EQ(what, PlaybackSession::kWhatBinaryData);
void WifiDisplaySource::finishStop() {
ALOGV("finishStop");
+ disconnectClientAsync();
+}
+
+void WifiDisplaySource::finishStopAfterDisconnectingClient() {
+ ALOGV("finishStopAfterDisconnectingClient");
+
#if REQUIRE_HDCP
if (mHDCP != NULL) {
ALOGI("Initiating HDCP shutdown.");
mHDCP.clear();
#endif
- disconnectClient();
-
if (mSessionID != 0) {
mNetSession->destroySession(mSessionID);
mSessionID = 0;
}
- ALOGV("finishStop2 completed.");
+ ALOGI("We're stopped.");
status_t err = OK;
return mClientInfo.mPlaybackSession;
}
-void WifiDisplaySource::disconnectClient() {
+void WifiDisplaySource::disconnectClientAsync() {
+ ALOGV("disconnectClient");
+
+ if (mClientInfo.mPlaybackSession == NULL) {
+ disconnectClient2();
+ return;
+ }
+
+ if (mClientInfo.mPlaybackSession != NULL) {
+ ALOGV("Destroying PlaybackSession");
+ mClientInfo.mPlaybackSession->destroyAsync();
+ }
+}
+
+void WifiDisplaySource::disconnectClient2() {
+ ALOGV("disconnectClient2");
+
if (mClientInfo.mPlaybackSession != NULL) {
- sp<PlaybackSession> playbackSession = mClientInfo.mPlaybackSession;
+ looper()->unregisterHandler(mClientInfo.mPlaybackSession->id());
mClientInfo.mPlaybackSession.clear();
-
- ALOGI("Destroying PlaybackSession");
- playbackSession->destroy();
- looper()->unregisterHandler(playbackSession->id());
}
if (mClientSessionID != 0) {
}
mClient->onDisplayDisconnected();
+
+ finishStopAfterDisconnectingClient();
}
#if REQUIRE_HDCP
sp<PlaybackSession> findPlaybackSession(
const sp<ParsedMessage> &data, int32_t *playbackSessionID) const;
- // Disconnects the current client and shuts down its playback session
- // (if any).
- void disconnectClient();
-
void finishStop();
+ void disconnectClientAsync();
+ void disconnectClient2();
+ void finishStopAfterDisconnectingClient();
void finishStop2();
DISALLOW_EVIL_CONSTRUCTORS(WifiDisplaySource);