From bd4e99c1636c75f6db0be70434b9f276bfecd96d Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 30 Oct 2012 15:53:03 -0700 Subject: [PATCH] Various tweaks to wfd related-to-bug: 7426218 Squashed commit of the following: commit 1553f1a1c66af998674168f7f7a3be23fcb0c794 Author: Andreas Huber Date: Tue Oct 30 15:51:27 2012 -0700 Add LPCM, AVC and AVC HRD descriptors as necessary. Change-Id: Ibc836fced0fe37e8a25574c2295e886765b9ea6f commit 4e74db61d2d31ebe239acbdec8f110f88016a4ea Author: Andreas Huber Date: Tue Oct 30 15:50:52 2012 -0700 added copyright headers to Sender.{cpp,h} Change-Id: If615ccb8767e32bd83ed1f0f669acc39a72489f6 commit 7144bf8ae68c5cdb8faa6e219547aabbd750f04e Author: Andreas Huber Date: Tue Oct 30 15:50:25 2012 -0700 Reenable suspension of the RepeaterSource Change-Id: I765338fcde89c65e4b69be45a5949eba6bcdcf6f commit 812164bcfa0699821d7d8eefcc0dff96b2e2cd08 Author: Andreas Huber Date: Tue Oct 30 14:03:50 2012 -0700 Add 2 stuffing bytes to the PES headers for audio tracks. Change-Id: I8b9c634f6a565ab7fa7ecdb610f7d8557e0b139b commit a084a741a63015d47c92d99fcd8b980fe615dc7d Author: Andreas Huber Date: Tue Oct 30 13:19:38 2012 -0700 Fix PCM audio packetization in WFD. Change-Id: I99a435f9fe6b4397f24d6c22afae5ae2505ffc14 commit c5cb9369585f701f34bce41534940d5f9b59248f Author: Andreas Huber Date: Tue Oct 30 13:19:12 2012 -0700 Support extraction of PCM audio from transport streams. Change-Id: I28a0516756ebcb5587325b6588df013ac871ffb9 commit b0a0512300ae037d6b39c2d04952d34b5fc12b2d Author: Andreas Huber Date: Tue Oct 30 08:54:13 2012 -0700 disable suspend of the RepeaterSource Change-Id: Ibf42a98185b0567f817ae582a82e6580f95d3d40 commit 4330e8b7668dc92a6d882b5622c0697cf292d04c Author: Andreas Huber Date: Mon Oct 29 14:11:25 2012 -0700 Better handling of datagrams in ANetworkSession reduce unnecessary copy overhead. Change-Id: I2ed8c767274ba07764f03e8d4913041168e5755f commit a44e73c322ba3f2c336f7cc4e1d63d3a74faa75d Author: Andreas Huber Date: Mon Oct 29 11:14:47 2012 -0700 Network traffic is now handled on a separate thread. Audio and video are queued to ensure proper A/V interleaving. Scheduled packet sends according to capture timestamps to reduce send-jitter. Change-Id: Ibd6357c1e663086cf87bec0a98f8e54dfdfaa0e5 related-to-bug: 7426218 Change-Id: Ia440129d656c35814abf18df06da50b73d5bb554 --- media/libstagefright/mpeg2ts/ATSParser.cpp | 17 + media/libstagefright/mpeg2ts/ATSParser.h | 1 + media/libstagefright/mpeg2ts/ESQueue.cpp | 63 ++ media/libstagefright/mpeg2ts/ESQueue.h | 2 + .../wifi-display/ANetworkSession.cpp | 37 +- media/libstagefright/wifi-display/Android.mk | 1 + .../wifi-display/source/Converter.cpp | 2 +- .../wifi-display/source/PlaybackSession.cpp | 944 ++++---------------- .../wifi-display/source/PlaybackSession.h | 104 +-- .../libstagefright/wifi-display/source/Sender.cpp | 979 +++++++++++++++++++++ media/libstagefright/wifi-display/source/Sender.h | 166 ++++ .../wifi-display/source/TSPacketizer.cpp | 159 +++- .../wifi-display/source/TSPacketizer.h | 3 +- .../wifi-display/source/WifiDisplaySource.cpp | 12 +- 14 files changed, 1597 insertions(+), 893 deletions(-) create mode 100644 media/libstagefright/wifi-display/source/Sender.cpp create mode 100644 media/libstagefright/wifi-display/source/Sender.h diff --git a/media/libstagefright/mpeg2ts/ATSParser.cpp b/media/libstagefright/mpeg2ts/ATSParser.cpp index 27c7bf4253..9faa6bca81 100644 --- a/media/libstagefright/mpeg2ts/ATSParser.cpp +++ b/media/libstagefright/mpeg2ts/ATSParser.cpp @@ -131,6 +131,8 @@ private: sp mSource; bool mPayloadStarted; + uint64_t mPrevPTS; + ElementaryStreamQueue *mQueue; status_t flush(); @@ -458,6 +460,7 @@ ATSParser::Stream::Stream( mPCR_PID(PCR_PID), mExpectedContinuityCounter(-1), mPayloadStarted(false), + mPrevPTS(0), mQueue(NULL) { switch (mStreamType) { case STREAMTYPE_H264: @@ -486,6 +489,11 @@ ATSParser::Stream::Stream( ElementaryStreamQueue::MPEG4_VIDEO); break; + case STREAMTYPE_PCM_AUDIO: + mQueue = new ElementaryStreamQueue( + ElementaryStreamQueue::PCM_AUDIO); + break; + default: break; } @@ -583,6 +591,7 @@ bool ATSParser::Stream::isAudio() const { case STREAMTYPE_MPEG1_AUDIO: case STREAMTYPE_MPEG2_AUDIO: case STREAMTYPE_MPEG2_AUDIO_ADTS: + case STREAMTYPE_PCM_AUDIO: return true; default: @@ -827,6 +836,14 @@ status_t ATSParser::Stream::flush() { void ATSParser::Stream::onPayloadData( unsigned PTS_DTS_flags, uint64_t PTS, uint64_t DTS, const uint8_t *data, size_t size) { +#if 0 + ALOGI("payload streamType 0x%02x, PTS = 0x%016llx, dPTS = %lld", + mStreamType, + PTS, + (int64_t)PTS - mPrevPTS); + mPrevPTS = PTS; +#endif + ALOGV("onPayloadData mStreamType=0x%02x", mStreamType); int64_t timeUs = 0ll; // no presentation timestamp available. diff --git a/media/libstagefright/mpeg2ts/ATSParser.h b/media/libstagefright/mpeg2ts/ATSParser.h index 5ccbab77f3..46edc45c69 100644 --- a/media/libstagefright/mpeg2ts/ATSParser.h +++ b/media/libstagefright/mpeg2ts/ATSParser.h @@ -87,6 +87,7 @@ struct ATSParser : public RefBase { STREAMTYPE_MPEG2_AUDIO_ADTS = 0x0f, STREAMTYPE_MPEG4_VIDEO = 0x10, STREAMTYPE_H264 = 0x1b, + STREAMTYPE_PCM_AUDIO = 0x83, }; protected: diff --git a/media/libstagefright/mpeg2ts/ESQueue.cpp b/media/libstagefright/mpeg2ts/ESQueue.cpp index e58e9bf6f2..82fb63791b 100644 --- a/media/libstagefright/mpeg2ts/ESQueue.cpp +++ b/media/libstagefright/mpeg2ts/ESQueue.cpp @@ -31,6 +31,8 @@ #include "include/avc_utils.h" +#include + namespace android { ElementaryStreamQueue::ElementaryStreamQueue(Mode mode, uint32_t flags) @@ -248,6 +250,11 @@ status_t ElementaryStreamQueue::appendData( break; } + case PCM_AUDIO: + { + break; + } + default: TRESPASS(); break; @@ -324,12 +331,68 @@ sp ElementaryStreamQueue::dequeueAccessUnit() { return dequeueAccessUnitMPEGVideo(); case MPEG4_VIDEO: return dequeueAccessUnitMPEG4Video(); + case PCM_AUDIO: + return dequeueAccessUnitPCMAudio(); default: CHECK_EQ((unsigned)mMode, (unsigned)MPEG_AUDIO); return dequeueAccessUnitMPEGAudio(); } } +sp ElementaryStreamQueue::dequeueAccessUnitPCMAudio() { + if (mBuffer->size() < 4) { + return NULL; + } + + ABitReader bits(mBuffer->data(), 4); + CHECK_EQ(bits.getBits(8), 0xa0); + unsigned numAUs = bits.getBits(8); + bits.skipBits(8); + unsigned quantization_word_length = bits.getBits(2); + unsigned audio_sampling_frequency = bits.getBits(3); + unsigned num_channels = bits.getBits(3); + + CHECK_EQ(audio_sampling_frequency, 2); // 48kHz + CHECK_EQ(num_channels, 1u); // stereo! + + if (mFormat == NULL) { + mFormat = new MetaData; + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_RAW); + mFormat->setInt32(kKeyChannelCount, 2); + mFormat->setInt32(kKeySampleRate, 48000); + } + + static const size_t kFramesPerAU = 80; + size_t frameSize = 2 /* numChannels */ * sizeof(int16_t); + + size_t payloadSize = numAUs * frameSize * kFramesPerAU; + + if (mBuffer->size() < 4 + payloadSize) { + return NULL; + } + + sp accessUnit = new ABuffer(payloadSize); + memcpy(accessUnit->data(), mBuffer->data() + 4, payloadSize); + + int64_t timeUs = fetchTimestamp(payloadSize + 4); + CHECK_GE(timeUs, 0ll); + accessUnit->meta()->setInt64("timeUs", timeUs); + + int16_t *ptr = (int16_t *)accessUnit->data(); + for (size_t i = 0; i < payloadSize / sizeof(int16_t); ++i) { + ptr[i] = ntohs(ptr[i]); + } + + memmove( + mBuffer->data(), + mBuffer->data() + 4 + payloadSize, + mBuffer->size() - 4 - payloadSize); + + mBuffer->setRange(0, mBuffer->size() - 4 - payloadSize); + + return accessUnit; +} + sp ElementaryStreamQueue::dequeueAccessUnitAAC() { int64_t timeUs; diff --git a/media/libstagefright/mpeg2ts/ESQueue.h b/media/libstagefright/mpeg2ts/ESQueue.h index 72aa2e70c8..66a8087f51 100644 --- a/media/libstagefright/mpeg2ts/ESQueue.h +++ b/media/libstagefright/mpeg2ts/ESQueue.h @@ -35,6 +35,7 @@ struct ElementaryStreamQueue { MPEG_AUDIO, MPEG_VIDEO, MPEG4_VIDEO, + PCM_AUDIO, }; enum Flags { @@ -69,6 +70,7 @@ private: sp dequeueAccessUnitMPEGAudio(); sp dequeueAccessUnitMPEGVideo(); sp dequeueAccessUnitMPEG4Video(); + sp dequeueAccessUnitPCMAudio(); // consume a logical (compressed) access unit of size "size", // returns its timestamp in us (or -1 if no time information). diff --git a/media/libstagefright/wifi-display/ANetworkSession.cpp b/media/libstagefright/wifi-display/ANetworkSession.cpp index 0279c34814..819cd62c36 100644 --- a/media/libstagefright/wifi-display/ANetworkSession.cpp +++ b/media/libstagefright/wifi-display/ANetworkSession.cpp @@ -94,8 +94,11 @@ private: sp mNotify; bool mSawReceiveFailure, mSawSendFailure; + // for TCP / stream data AString mOutBuffer; - List mOutBufferSizes; + + // for UDP / datagrams + List > mOutDatagrams; AString mInBuffer; @@ -213,8 +216,8 @@ bool ANetworkSession::Session::wantsToRead() { bool ANetworkSession::Session::wantsToWrite() { return !mSawSendFailure && (mState == CONNECTING - || ((mState == CONNECTED || mState == DATAGRAM) - && !mOutBuffer.empty())); + || (mState == CONNECTED && !mOutBuffer.empty()) + || (mState == DATAGRAM && !mOutDatagrams.empty())); } status_t ANetworkSession::Session::readMore() { @@ -398,30 +401,27 @@ status_t ANetworkSession::Session::readMore() { status_t ANetworkSession::Session::writeMore() { if (mState == DATAGRAM) { - CHECK(!mOutBufferSizes.empty()); + CHECK(!mOutDatagrams.empty()); status_t err; do { - size_t size = *mOutBufferSizes.begin(); - - CHECK_GE(mOutBuffer.size(), size); + const sp &datagram = *mOutDatagrams.begin(); int n; do { - n = send(mSocket, mOutBuffer.c_str(), size, 0); + n = send(mSocket, datagram->data(), datagram->size(), 0); } while (n < 0 && errno == EINTR); err = OK; if (n > 0) { - mOutBufferSizes.erase(mOutBufferSizes.begin()); - mOutBuffer.erase(0, n); + mOutDatagrams.erase(mOutDatagrams.begin()); } else if (n < 0) { err = -errno; } else if (n == 0) { err = -ECONNRESET; } - } while (err == OK && !mOutBufferSizes.empty()); + } while (err == OK && !mOutDatagrams.empty()); if (err == -EAGAIN) { err = OK; @@ -488,6 +488,16 @@ status_t ANetworkSession::Session::writeMore() { status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { CHECK(mState == CONNECTED || mState == DATAGRAM); + if (mState == DATAGRAM) { + CHECK_GE(size, 0); + + sp datagram = new ABuffer(size); + memcpy(datagram->data(), data, size); + + mOutDatagrams.push_back(datagram); + return OK; + } + if (mState == CONNECTED && !mIsRTSPConnection) { CHECK_LE(size, 65535); @@ -502,11 +512,6 @@ status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { (const char *)data, (size >= 0) ? size : strlen((const char *)data)); - if (mState == DATAGRAM) { - CHECK_GE(size, 0); - mOutBufferSizes.push_back(size); - } - return OK; } diff --git a/media/libstagefright/wifi-display/Android.mk b/media/libstagefright/wifi-display/Android.mk index f8e4219eb2..611bfff784 100644 --- a/media/libstagefright/wifi-display/Android.mk +++ b/media/libstagefright/wifi-display/Android.mk @@ -14,6 +14,7 @@ LOCAL_SRC_FILES:= \ source/MediaPuller.cpp \ source/PlaybackSession.cpp \ source/RepeaterSource.cpp \ + source/Sender.cpp \ source/TSPacketizer.cpp \ source/WifiDisplaySource.cpp \ diff --git a/media/libstagefright/wifi-display/source/Converter.cpp b/media/libstagefright/wifi-display/source/Converter.cpp index 7a8dd7c6cf..93ae9a3ee1 100644 --- a/media/libstagefright/wifi-display/source/Converter.cpp +++ b/media/libstagefright/wifi-display/source/Converter.cpp @@ -465,7 +465,7 @@ status_t Converter::feedRawAudioInputBuffers() { timeUs += copyUs; buffer->meta()->setInt64("timeUs", timeUs); - if (copy == partialAudioAU->size() - 4) { + if (copy == partialAudioAU->capacity() - 4) { sp notify = mNotify->dup(); notify->setInt32("what", kWhatAccessUnit); notify->setBuffer("accessUnit", partialAudioAU); diff --git a/media/libstagefright/wifi-display/source/PlaybackSession.cpp b/media/libstagefright/wifi-display/source/PlaybackSession.cpp index 6ef5e4031f..f1e714039b 100644 --- a/media/libstagefright/wifi-display/source/PlaybackSession.cpp +++ b/media/libstagefright/wifi-display/source/PlaybackSession.cpp @@ -23,6 +23,7 @@ #include "Converter.h" #include "MediaPuller.h" #include "RepeaterSource.h" +#include "Sender.h" #include "TSPacketizer.h" #include "include/avc_utils.h" @@ -50,9 +51,6 @@ namespace android { -static size_t kMaxRTPPacketSize = 1500; -static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188; - struct WifiDisplaySource::PlaybackSession::Track : public AHandler { enum { kWhatStopped, @@ -80,6 +78,15 @@ struct WifiDisplaySource::PlaybackSession::Track : public AHandler { void queueAccessUnit(const sp &accessUnit); sp dequeueAccessUnit(); + bool hasOutputBuffer(int64_t *timeUs) const; + void queueOutputBuffer(const sp &accessUnit); + sp dequeueOutputBuffer(); + bool isSuspended() const; + + size_t countQueuedOutputBuffers() const { + return mQueuedOutputBuffers.size(); + } + void requestIDRFrame(); protected: @@ -101,6 +108,8 @@ private: bool mIsAudio; List > mQueuedAccessUnits; sp mRepeaterSource; + List > mQueuedOutputBuffers; + int64_t mLastOutputBufferQueuedTimeUs; static bool IsAudioFormat(const sp &format); @@ -120,7 +129,8 @@ WifiDisplaySource::PlaybackSession::Track::Track( mConverter(converter), mStarted(false), mPacketizerTrackIndex(-1), - mIsAudio(IsAudioFormat(mConverter->getOutputFormat())) { + mIsAudio(IsAudioFormat(mConverter->getOutputFormat())), + mLastOutputBufferQueuedTimeUs(-1ll) { } WifiDisplaySource::PlaybackSession::Track::~Track() { @@ -251,6 +261,53 @@ void WifiDisplaySource::PlaybackSession::Track::requestIDRFrame() { mConverter->requestIDRFrame(); } +bool WifiDisplaySource::PlaybackSession::Track::hasOutputBuffer( + int64_t *timeUs) const { + *timeUs = 0ll; + + if (mQueuedOutputBuffers.empty()) { + return false; + } + + const sp &outputBuffer = *mQueuedOutputBuffers.begin(); + + CHECK(outputBuffer->meta()->findInt64("timeUs", timeUs)); + + return true; +} + +void WifiDisplaySource::PlaybackSession::Track::queueOutputBuffer( + const sp &accessUnit) { + mQueuedOutputBuffers.push_back(accessUnit); + + mLastOutputBufferQueuedTimeUs = ALooper::GetNowUs(); +} + +sp WifiDisplaySource::PlaybackSession::Track::dequeueOutputBuffer() { + CHECK(!mQueuedOutputBuffers.empty()); + + sp outputBuffer = *mQueuedOutputBuffers.begin(); + mQueuedOutputBuffers.erase(mQueuedOutputBuffers.begin()); + + return outputBuffer; +} + +bool WifiDisplaySource::PlaybackSession::Track::isSuspended() const { + if (!mQueuedOutputBuffers.empty()) { + return false; + } + + if (mLastOutputBufferQueuedTimeUs < 0ll) { + // We've never seen an output buffer queued, but tracks start + // out live, not suspended. + return false; + } + + // If we've not seen new output data for 60ms or more, we consider + // this track suspended for the time being. + return (ALooper::GetNowUs() - mLastOutputBufferQueuedTimeUs) > 60000ll; +} + //////////////////////////////////////////////////////////////////////////////// WifiDisplaySource::PlaybackSession::PlaybackSession( @@ -265,197 +322,37 @@ WifiDisplaySource::PlaybackSession::PlaybackSession( mWeAreDead(false), mLastLifesignUs(), mVideoTrackIndex(-1), - mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)), mPrevTimeUs(-1ll), - mTransportMode(TRANSPORT_UDP), - mAllTracksHavePacketizerIndex(false), - mRTPChannel(0), - mRTCPChannel(0), - mRTPPort(0), - mRTPSessionID(0), - mRTCPSessionID(0), -#if ENABLE_RETRANSMISSION - mRTPRetransmissionSessionID(0), - mRTCPRetransmissionSessionID(0), -#endif - mClientRTPPort(0), - mClientRTCPPort(0), - mRTPConnected(false), - mRTCPConnected(false), - mRTPSeqNo(0), -#if ENABLE_RETRANSMISSION - mRTPRetransmissionSeqNo(0), -#endif - mLastNTPTime(0), - mLastRTPTime(0), - mNumRTPSent(0), - mNumRTPOctetsSent(0), - mNumSRsSent(0), - mSendSRPending(false) -#if ENABLE_RETRANSMISSION - ,mHistoryLength(0) -#endif -#if TRACK_BANDWIDTH - ,mFirstPacketTimeUs(-1ll) - ,mTotalBytesSent(0ll) -#endif -#if LOG_TRANSPORT_STREAM - ,mLogFile(NULL) -#endif -{ - mTSQueue->setRange(0, 12); - -#if LOG_TRANSPORT_STREAM - mLogFile = fopen("/system/etc/log.ts", "wb"); -#endif + mAllTracksHavePacketizerIndex(false) { } status_t WifiDisplaySource::PlaybackSession::init( const char *clientIP, int32_t clientRtp, int32_t clientRtcp, - TransportMode transportMode, + Sender::TransportMode transportMode, bool usePCMAudio) { - mClientIP = clientIP; - status_t err = setupPacketizer(usePCMAudio); if (err != OK) { return err; } - mTransportMode = transportMode; - - if (transportMode == TRANSPORT_TCP_INTERLEAVED) { - mRTPChannel = clientRtp; - mRTCPChannel = clientRtcp; - mRTPPort = 0; - mRTPSessionID = 0; - mRTCPSessionID = 0; - - updateLiveness(); - return OK; - } - - mRTPChannel = 0; - mRTCPChannel = 0; - - if (mTransportMode == TRANSPORT_TCP) { - // XXX This is wrong, we need to allocate sockets here, we only - // need to do this because the dongles are not establishing their - // end until after PLAY instead of before SETUP. - mRTPPort = 20000; - mRTPSessionID = 0; - mRTCPSessionID = 0; - mClientRTPPort = clientRtp; - mClientRTCPPort = clientRtcp; - - updateLiveness(); - return OK; - } - - int serverRtp; + sp notify = new AMessage(kWhatSenderNotify, id()); + mSender = new Sender(mNetSession, notify); - sp rtpNotify = new AMessage(kWhatRTPNotify, id()); - sp rtcpNotify = new AMessage(kWhatRTCPNotify, id()); + mSenderLooper = new ALooper; + mSenderLooper->setName("sender_looper"); -#if ENABLE_RETRANSMISSION - sp rtpRetransmissionNotify = - new AMessage(kWhatRTPRetransmissionNotify, id()); - - sp rtcpRetransmissionNotify = - new AMessage(kWhatRTCPRetransmissionNotify, id()); -#endif - - for (serverRtp = 15550;; serverRtp += 2) { - int32_t rtpSession; - if (mTransportMode == TRANSPORT_UDP) { - err = mNetSession->createUDPSession( - serverRtp, clientIP, clientRtp, - rtpNotify, &rtpSession); - } else { - err = mNetSession->createTCPDatagramSession( - serverRtp, clientIP, clientRtp, - rtpNotify, &rtpSession); - } - - if (err != OK) { - ALOGI("failed to create RTP socket on port %d", serverRtp); - continue; - } - - int32_t rtcpSession = 0; - - if (clientRtcp >= 0) { - if (mTransportMode == TRANSPORT_UDP) { - err = mNetSession->createUDPSession( - serverRtp + 1, clientIP, clientRtcp, - rtcpNotify, &rtcpSession); - } else { - err = mNetSession->createTCPDatagramSession( - serverRtp + 1, clientIP, clientRtcp, - rtcpNotify, &rtcpSession); - } - - if (err != OK) { - ALOGI("failed to create RTCP socket on port %d", serverRtp + 1); - - mNetSession->destroySession(rtpSession); - continue; - } - } - -#if ENABLE_RETRANSMISSION - if (mTransportMode == TRANSPORT_UDP) { - int32_t rtpRetransmissionSession; - - err = mNetSession->createUDPSession( - serverRtp + kRetransmissionPortOffset, - clientIP, - clientRtp + kRetransmissionPortOffset, - rtpRetransmissionNotify, - &rtpRetransmissionSession); - - if (err != OK) { - mNetSession->destroySession(rtcpSession); - mNetSession->destroySession(rtpSession); - continue; - } - - CHECK_GE(clientRtcp, 0); - - int32_t rtcpRetransmissionSession; - err = mNetSession->createUDPSession( - serverRtp + 1 + kRetransmissionPortOffset, - clientIP, - clientRtp + 1 + kRetransmissionPortOffset, - rtcpRetransmissionNotify, - &rtcpRetransmissionSession); - - if (err != OK) { - mNetSession->destroySession(rtpRetransmissionSession); - mNetSession->destroySession(rtcpSession); - mNetSession->destroySession(rtpSession); - continue; - } - - mRTPRetransmissionSessionID = rtpRetransmissionSession; - mRTCPRetransmissionSessionID = rtcpRetransmissionSession; - - ALOGI("rtpRetransmissionSessionID = %d, " - "rtcpRetransmissionSessionID = %d", - rtpRetransmissionSession, rtcpRetransmissionSession); - } -#endif + mSenderLooper->start( + false /* runOnCallingThread */, + false /* canCallJava */, + PRIORITY_AUDIO); - mRTPPort = serverRtp; - mRTPSessionID = rtpSession; - mRTCPSessionID = rtcpSession; + mSenderLooper->registerHandler(mSender); - ALOGI("rtpSessionID = %d, rtcpSessionID = %d", rtpSession, rtcpSession); - break; - } + err = mSender->init(clientIP, clientRtp, clientRtcp, transportMode); - if (mRTPPort == 0) { - return UNKNOWN_ERROR; + if (err != OK) { + return err; } updateLiveness(); @@ -464,16 +361,10 @@ status_t WifiDisplaySource::PlaybackSession::init( } WifiDisplaySource::PlaybackSession::~PlaybackSession() { -#if LOG_TRANSPORT_STREAM - if (mLogFile != NULL) { - fclose(mLogFile); - mLogFile = NULL; - } -#endif } int32_t WifiDisplaySource::PlaybackSession::getRTPPort() const { - return mRTPPort; + return mSender->getRTPPort(); } int64_t WifiDisplaySource::PlaybackSession::getLastLifesignUs() const { @@ -497,35 +388,11 @@ status_t WifiDisplaySource::PlaybackSession::finishPlay() { } status_t WifiDisplaySource::PlaybackSession::onFinishPlay() { - if (mTransportMode != TRANSPORT_TCP) { - return onFinishPlay2(); - } - - sp rtpNotify = new AMessage(kWhatRTPNotify, id()); - - status_t err = mNetSession->createTCPDatagramSession( - mRTPPort, mClientIP.c_str(), mClientRTPPort, - rtpNotify, &mRTPSessionID); - - if (err != OK) { - return err; - } - - if (mClientRTCPPort >= 0) { - sp rtcpNotify = new AMessage(kWhatRTCPNotify, id()); - - err = mNetSession->createTCPDatagramSession( - mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort, - rtcpNotify, &mRTCPSessionID); - } - - return err; + return mSender->finishInit(); } status_t WifiDisplaySource::PlaybackSession::onFinishPlay2() { - if (mRTCPSessionID != 0) { - scheduleSendSR(); - } + mSender->scheduleSendSR(); for (size_t i = 0; i < mTracks.size(); ++i) { CHECK_EQ((status_t)OK, mTracks.editValueAt(i)->start()); @@ -555,134 +422,6 @@ void WifiDisplaySource::PlaybackSession::destroyAsync() { void WifiDisplaySource::PlaybackSession::onMessageReceived( const sp &msg) { switch (msg->what()) { - case kWhatRTPNotify: - case kWhatRTCPNotify: -#if ENABLE_RETRANSMISSION - case kWhatRTPRetransmissionNotify: - case kWhatRTCPRetransmissionNotify: -#endif - { - int32_t reason; - CHECK(msg->findInt32("reason", &reason)); - - switch (reason) { - case ANetworkSession::kWhatError: - { - int32_t sessionID; - CHECK(msg->findInt32("sessionID", &sessionID)); - - int32_t err; - CHECK(msg->findInt32("err", &err)); - - int32_t errorOccuredDuringSend; - CHECK(msg->findInt32("send", &errorOccuredDuringSend)); - - AString detail; - CHECK(msg->findString("detail", &detail)); - - if ((msg->what() == kWhatRTPNotify -#if ENABLE_RETRANSMISSION - || msg->what() == kWhatRTPRetransmissionNotify -#endif - ) && !errorOccuredDuringSend) { - // This is ok, we don't expect to receive anything on - // the RTP socket. - break; - } - - ALOGE("An error occurred during %s in session %d " - "(%d, '%s' (%s)).", - errorOccuredDuringSend ? "send" : "receive", - sessionID, - err, - detail.c_str(), - strerror(-err)); - - mNetSession->destroySession(sessionID); - - if (sessionID == mRTPSessionID) { - mRTPSessionID = 0; - } else if (sessionID == mRTCPSessionID) { - mRTCPSessionID = 0; - } -#if ENABLE_RETRANSMISSION - else if (sessionID == mRTPRetransmissionSessionID) { - mRTPRetransmissionSessionID = 0; - } else if (sessionID == mRTCPRetransmissionSessionID) { - mRTCPRetransmissionSessionID = 0; - } -#endif - - notifySessionDead(); - break; - } - - case ANetworkSession::kWhatDatagram: - { - int32_t sessionID; - CHECK(msg->findInt32("sessionID", &sessionID)); - - sp data; - CHECK(msg->findBuffer("data", &data)); - - status_t err; - if (msg->what() == kWhatRTCPNotify -#if ENABLE_RETRANSMISSION - || msg->what() == kWhatRTCPRetransmissionNotify -#endif - ) - { - err = parseRTCP(data); - } - break; - } - - case ANetworkSession::kWhatConnected: - { - CHECK_EQ(mTransportMode, TRANSPORT_TCP); - - int32_t sessionID; - CHECK(msg->findInt32("sessionID", &sessionID)); - - if (sessionID == mRTPSessionID) { - CHECK(!mRTPConnected); - mRTPConnected = true; - ALOGI("RTP Session now connected."); - } else if (sessionID == mRTCPSessionID) { - CHECK(!mRTCPConnected); - mRTCPConnected = true; - ALOGI("RTCP Session now connected."); - } else { - TRESPASS(); - } - - if (mRTPConnected - && (mClientRTCPPort < 0 || mRTCPConnected)) { - onFinishPlay2(); - } - break; - } - - default: - TRESPASS(); - } - break; - } - - case kWhatSendSR: - { - mSendSRPending = false; - - if (mRTCPSessionID == 0) { - break; - } - - onSendSR(); - - scheduleSendSR(); - break; - } - case kWhatConverterNotify: { if (mWeAreDead) { @@ -729,11 +468,9 @@ void WifiDisplaySource::PlaybackSession::onMessageReceived( break; } - status_t err = packetizeAccessUnit(trackIndex, accessUnit); + track->queueOutputBuffer(accessUnit); - if (err != OK) { - notifySessionDead(); - } + drainAccessUnits(); break; } else if (what == Converter::kWhatEOS) { CHECK_EQ(what, Converter::kWhatEOS); @@ -765,6 +502,22 @@ void WifiDisplaySource::PlaybackSession::onMessageReceived( break; } + case kWhatSenderNotify: + { + int32_t what; + CHECK(msg->findInt32("what", &what)); + + if (what == Sender::kWhatInitDone) { + onFinishPlay2(); + } else if (what == Sender::kWhatSessionDead) { + notifySessionDead(); + } else { + TRESPASS(); + } + + break; + } + case kWhatFinishPlay: { onFinishPlay(); @@ -792,30 +545,38 @@ void WifiDisplaySource::PlaybackSession::onMessageReceived( break; } + mSenderLooper->unregisterHandler(mSender->id()); + mSender.clear(); + mSenderLooper.clear(); + mPacketizer.clear(); -#if ENABLE_RETRANSMISSION - if (mRTCPRetransmissionSessionID != 0) { - mNetSession->destroySession(mRTCPRetransmissionSessionID); - } + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatSessionDestroyed); + notify->post(); + } + break; + } - if (mRTPRetransmissionSessionID != 0) { - mNetSession->destroySession(mRTPRetransmissionSessionID); - } -#endif + case kWhatPacketize: + { + size_t trackIndex; + CHECK(msg->findSize("trackIndex", &trackIndex)); - if (mRTCPSessionID != 0) { - mNetSession->destroySession(mRTCPSessionID); - } + sp accessUnit; + CHECK(msg->findBuffer("accessUnit", &accessUnit)); - if (mRTPSessionID != 0) { - mNetSession->destroySession(mRTPSessionID); - } +#if 0 + if ((ssize_t)trackIndex == mVideoTrackIndex) { + int64_t nowUs = ALooper::GetNowUs(); + static int64_t prevNowUs = 0ll; - sp notify = mNotify->dup(); - notify->setInt32("what", kWhatSessionDestroyed); - notify->post(); + ALOGI("sending AU, dNowUs=%lld us", nowUs - prevNowUs); + + prevNowUs = nowUs; } +#endif + break; } @@ -981,372 +742,6 @@ int32_t WifiDisplaySource::PlaybackSession::height() const { return 720; } -void WifiDisplaySource::PlaybackSession::scheduleSendSR() { - if (mSendSRPending) { - return; - } - - mSendSRPending = true; - (new AMessage(kWhatSendSR, id()))->post(kSendSRIntervalUs); -} - -void WifiDisplaySource::PlaybackSession::addSR(const sp &buffer) { - uint8_t *data = buffer->data() + buffer->size(); - - // TODO: Use macros/utility functions to clean up all the bitshifts below. - - data[0] = 0x80 | 0; - data[1] = 200; // SR - data[2] = 0; - data[3] = 6; - data[4] = kSourceID >> 24; - data[5] = (kSourceID >> 16) & 0xff; - data[6] = (kSourceID >> 8) & 0xff; - data[7] = kSourceID & 0xff; - - data[8] = mLastNTPTime >> (64 - 8); - data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; - data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; - data[11] = (mLastNTPTime >> 32) & 0xff; - data[12] = (mLastNTPTime >> 24) & 0xff; - data[13] = (mLastNTPTime >> 16) & 0xff; - data[14] = (mLastNTPTime >> 8) & 0xff; - data[15] = mLastNTPTime & 0xff; - - data[16] = (mLastRTPTime >> 24) & 0xff; - data[17] = (mLastRTPTime >> 16) & 0xff; - data[18] = (mLastRTPTime >> 8) & 0xff; - data[19] = mLastRTPTime & 0xff; - - data[20] = mNumRTPSent >> 24; - data[21] = (mNumRTPSent >> 16) & 0xff; - data[22] = (mNumRTPSent >> 8) & 0xff; - data[23] = mNumRTPSent & 0xff; - - data[24] = mNumRTPOctetsSent >> 24; - data[25] = (mNumRTPOctetsSent >> 16) & 0xff; - data[26] = (mNumRTPOctetsSent >> 8) & 0xff; - data[27] = mNumRTPOctetsSent & 0xff; - - buffer->setRange(buffer->offset(), buffer->size() + 28); -} - -void WifiDisplaySource::PlaybackSession::addSDES(const sp &buffer) { - uint8_t *data = buffer->data() + buffer->size(); - data[0] = 0x80 | 1; - data[1] = 202; // SDES - data[4] = kSourceID >> 24; - data[5] = (kSourceID >> 16) & 0xff; - data[6] = (kSourceID >> 8) & 0xff; - data[7] = kSourceID & 0xff; - - size_t offset = 8; - - data[offset++] = 1; // CNAME - - static const char *kCNAME = "someone@somewhere"; - data[offset++] = strlen(kCNAME); - - memcpy(&data[offset], kCNAME, strlen(kCNAME)); - offset += strlen(kCNAME); - - data[offset++] = 7; // NOTE - - static const char *kNOTE = "Hell's frozen over."; - data[offset++] = strlen(kNOTE); - - memcpy(&data[offset], kNOTE, strlen(kNOTE)); - offset += strlen(kNOTE); - - data[offset++] = 0; - - if ((offset % 4) > 0) { - size_t count = 4 - (offset % 4); - switch (count) { - case 3: - data[offset++] = 0; - case 2: - data[offset++] = 0; - case 1: - data[offset++] = 0; - } - } - - size_t numWords = (offset / 4) - 1; - data[2] = numWords >> 8; - data[3] = numWords & 0xff; - - buffer->setRange(buffer->offset(), buffer->size() + offset); -} - -// static -uint64_t WifiDisplaySource::PlaybackSession::GetNowNTP() { - uint64_t nowUs = ALooper::GetNowUs(); - - nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; - - uint64_t hi = nowUs / 1000000ll; - uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; - - return (hi << 32) | lo; -} - -void WifiDisplaySource::PlaybackSession::onSendSR() { - sp buffer = new ABuffer(1500); - buffer->setRange(0, 0); - - addSR(buffer); - addSDES(buffer); - - if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { - sp notify = mNotify->dup(); - notify->setInt32("what", kWhatBinaryData); - notify->setInt32("channel", mRTCPChannel); - notify->setBuffer("data", buffer); - notify->post(); - } else { - sendPacket(mRTCPSessionID, buffer->data(), buffer->size()); - } - - ++mNumSRsSent; -} - -ssize_t WifiDisplaySource::PlaybackSession::appendTSData( - const void *data, size_t size, bool timeDiscontinuity, bool flush) { - CHECK_EQ(size, 188); - - CHECK_LE(mTSQueue->size() + size, mTSQueue->capacity()); - - memcpy(mTSQueue->data() + mTSQueue->size(), data, size); - mTSQueue->setRange(0, mTSQueue->size() + size); - - if (flush || mTSQueue->size() == mTSQueue->capacity()) { - // flush - - int64_t nowUs = ALooper::GetNowUs(); - -#if TRACK_BANDWIDTH - if (mFirstPacketTimeUs < 0ll) { - mFirstPacketTimeUs = nowUs; - } -#endif - - // 90kHz time scale - uint32_t rtpTime = (nowUs * 9ll) / 100ll; - - uint8_t *rtp = mTSQueue->data(); - rtp[0] = 0x80; - rtp[1] = 33 | (timeDiscontinuity ? (1 << 7) : 0); // M-bit - rtp[2] = (mRTPSeqNo >> 8) & 0xff; - rtp[3] = mRTPSeqNo & 0xff; - rtp[4] = rtpTime >> 24; - rtp[5] = (rtpTime >> 16) & 0xff; - rtp[6] = (rtpTime >> 8) & 0xff; - rtp[7] = rtpTime & 0xff; - rtp[8] = kSourceID >> 24; - rtp[9] = (kSourceID >> 16) & 0xff; - rtp[10] = (kSourceID >> 8) & 0xff; - rtp[11] = kSourceID & 0xff; - - ++mRTPSeqNo; - ++mNumRTPSent; - mNumRTPOctetsSent += mTSQueue->size() - 12; - - mLastRTPTime = rtpTime; - mLastNTPTime = GetNowNTP(); - - if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { - sp notify = mNotify->dup(); - notify->setInt32("what", kWhatBinaryData); - - sp data = new ABuffer(mTSQueue->size()); - memcpy(data->data(), rtp, mTSQueue->size()); - - notify->setInt32("channel", mRTPChannel); - notify->setBuffer("data", data); - notify->post(); - } else { - sendPacket(mRTPSessionID, rtp, mTSQueue->size()); - -#if TRACK_BANDWIDTH - mTotalBytesSent += mTSQueue->size(); - int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs; - - if (delayUs > 0ll) { - ALOGI("approx. net bandwidth used: %.2f Mbit/sec", - mTotalBytesSent * 8.0 / delayUs); - } -#endif - } - -#if ENABLE_RETRANSMISSION - mTSQueue->setInt32Data(mRTPSeqNo - 1); - - mHistory.push_back(mTSQueue); - ++mHistoryLength; - - if (mHistoryLength > kMaxHistoryLength) { - mTSQueue = *mHistory.begin(); - mHistory.erase(mHistory.begin()); - - --mHistoryLength; - } else { - mTSQueue = new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); - } -#endif - - mTSQueue->setRange(0, 12); - } - - return size; -} - -status_t WifiDisplaySource::PlaybackSession::parseRTCP( - const sp &buffer) { - const uint8_t *data = buffer->data(); - size_t size = buffer->size(); - - while (size > 0) { - if (size < 8) { - // Too short to be a valid RTCP header - return ERROR_MALFORMED; - } - - if ((data[0] >> 6) != 2) { - // Unsupported version. - return ERROR_UNSUPPORTED; - } - - if (data[0] & 0x20) { - // Padding present. - - size_t paddingLength = data[size - 1]; - - if (paddingLength + 12 > size) { - // If we removed this much padding we'd end up with something - // that's too short to be a valid RTP header. - return ERROR_MALFORMED; - } - - size -= paddingLength; - } - - size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; - - if (size < headerLength) { - // Only received a partial packet? - return ERROR_MALFORMED; - } - - switch (data[1]) { - case 200: - case 201: // RR - case 202: // SDES - case 203: - case 204: // APP - break; - -#if ENABLE_RETRANSMISSION - case 205: // TSFB (transport layer specific feedback) - parseTSFB(data, headerLength); - break; -#endif - - case 206: // PSFB (payload specific feedback) - hexdump(data, headerLength); - break; - - default: - { - ALOGW("Unknown RTCP packet type %u of size %d", - (unsigned)data[1], headerLength); - break; - } - } - - data += headerLength; - size -= headerLength; - } - - return OK; -} - -#if ENABLE_RETRANSMISSION -status_t WifiDisplaySource::PlaybackSession::parseTSFB( - const uint8_t *data, size_t size) { - if ((data[0] & 0x1f) != 1) { - return ERROR_UNSUPPORTED; // We only support NACK for now. - } - - uint32_t srcId = U32_AT(&data[8]); - if (srcId != kSourceID) { - return ERROR_MALFORMED; - } - - for (size_t i = 12; i < size; i += 4) { - uint16_t seqNo = U16_AT(&data[i]); - uint16_t blp = U16_AT(&data[i + 2]); - - List >::iterator it = mHistory.begin(); - bool foundSeqNo = false; - while (it != mHistory.end()) { - const sp &buffer = *it; - - uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; - - bool retransmit = false; - if (bufferSeqNo == seqNo) { - retransmit = true; - } else if (blp != 0) { - for (size_t i = 0; i < 16; ++i) { - if ((blp & (1 << i)) - && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { - blp &= ~(1 << i); - retransmit = true; - } - } - } - - if (retransmit) { - ALOGI("retransmitting seqNo %d", bufferSeqNo); - - sp retransRTP = new ABuffer(2 + buffer->size()); - uint8_t *rtp = retransRTP->data(); - memcpy(rtp, buffer->data(), 12); - rtp[2] = (mRTPRetransmissionSeqNo >> 8) & 0xff; - rtp[3] = mRTPRetransmissionSeqNo & 0xff; - rtp[12] = (bufferSeqNo >> 8) & 0xff; - rtp[13] = bufferSeqNo & 0xff; - memcpy(&rtp[14], buffer->data() + 12, buffer->size() - 12); - - ++mRTPRetransmissionSeqNo; - - sendPacket( - mRTPRetransmissionSessionID, - retransRTP->data(), retransRTP->size()); - - if (bufferSeqNo == seqNo) { - foundSeqNo = true; - } - - if (foundSeqNo && blp == 0) { - break; - } - } - - ++it; - } - - if (!foundSeqNo || blp != 0) { - ALOGI("Some sequence numbers were no longer available for " - "retransmission"); - } - } - - return OK; -} -#endif - void WifiDisplaySource::PlaybackSession::requestIDRFrame() { for (size_t i = 0; i < mTracks.size(); ++i) { const sp &track = mTracks.valueAt(i); @@ -1355,11 +750,6 @@ void WifiDisplaySource::PlaybackSession::requestIDRFrame() { } } -status_t WifiDisplaySource::PlaybackSession::sendPacket( - int32_t sessionID, const void *data, size_t size) { - return mNetSession->sendRequest(sessionID, data, size); -} - bool WifiDisplaySource::PlaybackSession::allTracksHavePacketizerIndex() { if (mAllTracksHavePacketizerIndex) { return true; @@ -1377,7 +767,8 @@ bool WifiDisplaySource::PlaybackSession::allTracksHavePacketizerIndex() { } status_t WifiDisplaySource::PlaybackSession::packetizeAccessUnit( - size_t trackIndex, const sp &accessUnit) { + size_t trackIndex, const sp &accessUnit, + sp *packets) { const sp &track = mTracks.valueFor(trackIndex); uint32_t flags = 0; @@ -1477,35 +868,11 @@ status_t WifiDisplaySource::PlaybackSession::packetizeAccessUnit( mPrevTimeUs = timeUs; } - sp packets; mPacketizer->packetize( - track->packetizerTrackIndex(), accessUnit, &packets, flags, + track->packetizerTrackIndex(), accessUnit, packets, flags, !isHDCPEncrypted ? NULL : HDCP_private_data, - !isHDCPEncrypted ? 0 : sizeof(HDCP_private_data)); - - for (size_t offset = 0; - offset < packets->size(); offset += 188) { - bool lastTSPacket = (offset + 188 >= packets->size()); - - // We're only going to flush video, audio packets are - // much more frequent and would waste all that space - // available in a full sized UDP packet. - bool flush = - lastTSPacket - && ((ssize_t)trackIndex == mVideoTrackIndex); - - appendTSData( - packets->data() + offset, - 188, - true /* timeDiscontinuity */, - flush); - } - -#if LOG_TRANSPORT_STREAM - if (mLogFile != NULL) { - fwrite(packets->data(), 1, packets->size(), mLogFile); - } -#endif + !isHDCPEncrypted ? 0 : sizeof(HDCP_private_data), + track->isAudio() ? 2 : 0 /* numStuffingBytes */); return OK; } @@ -1519,12 +886,7 @@ status_t WifiDisplaySource::PlaybackSession::packetizeQueuedAccessUnits() { sp accessUnit = track->dequeueAccessUnit(); if (accessUnit != NULL) { - status_t err = packetizeAccessUnit(trackIndex, accessUnit); - - if (err != OK) { - return err; - } - + track->queueOutputBuffer(accessUnit); gotMoreData = true; } } @@ -1546,5 +908,57 @@ void WifiDisplaySource::PlaybackSession::notifySessionDead() { mWeAreDead = true; } +void WifiDisplaySource::PlaybackSession::drainAccessUnits() { + ALOGV("audio/video has %d/%d buffers ready.", + mTracks.valueFor(1)->countQueuedOutputBuffers(), + mTracks.valueFor(0)->countQueuedOutputBuffers()); + + while (drainAccessUnit()) { + } +} + +bool WifiDisplaySource::PlaybackSession::drainAccessUnit() { + ssize_t minTrackIndex = -1; + int64_t minTimeUs = -1ll; + + for (size_t i = 0; i < mTracks.size(); ++i) { + const sp &track = mTracks.valueAt(i); + + int64_t timeUs; + if (track->hasOutputBuffer(&timeUs)) { + if (minTrackIndex < 0 || timeUs < minTimeUs) { + minTrackIndex = mTracks.keyAt(i); + minTimeUs = timeUs; + } + } else if (!track->isSuspended()) { + // We still consider this track "live", so it should keep + // delivering output data whose time stamps we'll have to + // consider for proper interleaving. + return false; + } + } + + if (minTrackIndex < 0) { + return false; + } + + const sp &track = mTracks.valueFor(minTrackIndex); + sp accessUnit = track->dequeueOutputBuffer(); + + sp packets; + status_t err = packetizeAccessUnit(minTrackIndex, accessUnit, &packets); + + if (err != OK) { + notifySessionDead(); + } + + if ((ssize_t)minTrackIndex == mVideoTrackIndex) { + packets->meta()->setInt32("isVideo", 1); + } + mSender->queuePackets(minTimeUs, packets); + + return true; +} + } // namespace android diff --git a/media/libstagefright/wifi-display/source/PlaybackSession.h b/media/libstagefright/wifi-display/source/PlaybackSession.h index 4bbc3f0268..cc8b244468 100644 --- a/media/libstagefright/wifi-display/source/PlaybackSession.h +++ b/media/libstagefright/wifi-display/source/PlaybackSession.h @@ -18,6 +18,7 @@ #define PLAYBACK_SESSION_H_ +#include "Sender.h" #include "WifiDisplaySource.h" namespace android { @@ -30,10 +31,6 @@ struct MediaPuller; struct MediaSource; struct TSPacketizer; -#define LOG_TRANSPORT_STREAM 0 -#define ENABLE_RETRANSMISSION 0 -#define TRACK_BANDWIDTH 0 - // Encapsulates the state of an RTP/RTCP session in the context of wifi // display. struct WifiDisplaySource::PlaybackSession : public AHandler { @@ -43,14 +40,9 @@ struct WifiDisplaySource::PlaybackSession : public AHandler { const struct in_addr &interfaceAddr, const sp &hdcp); - enum TransportMode { - TRANSPORT_UDP, - TRANSPORT_TCP_INTERLEAVED, - TRANSPORT_TCP, - }; status_t init( const char *clientIP, int32_t clientRtp, int32_t clientRtcp, - TransportMode transportMode, + Sender::TransportMode transportMode, bool usePCMAudio); void destroyAsync(); @@ -85,29 +77,18 @@ private: struct Track; enum { - kWhatSendSR, - kWhatRTPNotify, - kWhatRTCPNotify, -#if ENABLE_RETRANSMISSION - kWhatRTPRetransmissionNotify, - kWhatRTCPRetransmissionNotify, -#endif kWhatMediaPullerNotify, kWhatConverterNotify, kWhatTrackNotify, + kWhatSenderNotify, kWhatUpdateSurface, kWhatFinishPlay, + kWhatPacketize, }; - static const int64_t kSendSRIntervalUs = 10000000ll; - static const uint32_t kSourceID = 0xdeadbeef; - static const size_t kMaxHistoryLength = 128; - -#if ENABLE_RETRANSMISSION - static const size_t kRetransmissionPortOffset = 120; -#endif - sp mNetSession; + sp mSender; + sp mSenderLooper; sp mNotify; in_addr mInterfaceAddr; sp mHDCP; @@ -121,66 +102,10 @@ private: KeyedVector > mTracks; ssize_t mVideoTrackIndex; - sp mTSQueue; int64_t mPrevTimeUs; - TransportMode mTransportMode; - - AString mClientIP; - bool mAllTracksHavePacketizerIndex; - // in TCP mode - int32_t mRTPChannel; - int32_t mRTCPChannel; - - // in UDP mode - int32_t mRTPPort; - int32_t mRTPSessionID; - int32_t mRTCPSessionID; - -#if ENABLE_RETRANSMISSION - int32_t mRTPRetransmissionSessionID; - int32_t mRTCPRetransmissionSessionID; -#endif - - int32_t mClientRTPPort; - int32_t mClientRTCPPort; - bool mRTPConnected; - bool mRTCPConnected; - - uint32_t mRTPSeqNo; -#if ENABLE_RETRANSMISSION - uint32_t mRTPRetransmissionSeqNo; -#endif - - uint64_t mLastNTPTime; - uint32_t mLastRTPTime; - uint32_t mNumRTPSent; - uint32_t mNumRTPOctetsSent; - uint32_t mNumSRsSent; - - bool mSendSRPending; - -#if ENABLE_RETRANSMISSION - List > mHistory; - size_t mHistoryLength; -#endif - -#if TRACK_BANDWIDTH - int64_t mFirstPacketTimeUs; - uint64_t mTotalBytesSent; -#endif - -#if LOG_TRANSPORT_STREAM - FILE *mLogFile; -#endif - - void onSendSR(); - void addSR(const sp &buffer); - void addSDES(const sp &buffer); - static uint64_t GetNowNTP(); - status_t setupPacketizer(bool usePCMAudio); status_t addSource( @@ -196,27 +121,24 @@ private: ssize_t appendTSData( const void *data, size_t size, bool timeDiscontinuity, bool flush); - void scheduleSendSR(); - - status_t parseRTCP(const sp &buffer); - -#if ENABLE_RETRANSMISSION - status_t parseTSFB(const uint8_t *data, size_t size); -#endif - - status_t sendPacket(int32_t sessionID, const void *data, size_t size); status_t onFinishPlay(); status_t onFinishPlay2(); bool allTracksHavePacketizerIndex(); status_t packetizeAccessUnit( - size_t trackIndex, const sp &accessUnit); + size_t trackIndex, const sp &accessUnit, + sp *packets); status_t packetizeQueuedAccessUnits(); void notifySessionDead(); + void drainAccessUnits(); + + // Returns true iff an access unit was successfully drained. + bool drainAccessUnit(); + DISALLOW_EVIL_CONSTRUCTORS(PlaybackSession); }; diff --git a/media/libstagefright/wifi-display/source/Sender.cpp b/media/libstagefright/wifi-display/source/Sender.cpp new file mode 100644 index 0000000000..ea124248b8 --- /dev/null +++ b/media/libstagefright/wifi-display/source/Sender.cpp @@ -0,0 +1,979 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "Sender" +#include + +#include "Sender.h" + +#include "ANetworkSession.h" + +#include +#include +#include +#include +#include +#include + +#include + +#define DEBUG_JITTER 0 + +namespace android { + +//////////////////////////////////////////////////////////////////////////////// + +#if DEBUG_JITTER +struct TimeSeries { + TimeSeries(); + + void add(double val); + + double mean() const; + double sdev() const; + +private: + enum { + kHistorySize = 20 + }; + double mValues[kHistorySize]; + + size_t mCount; + double mSum; +}; + +TimeSeries::TimeSeries() + : mCount(0), + mSum(0.0) { +} + +void TimeSeries::add(double val) { + if (mCount < kHistorySize) { + mValues[mCount++] = val; + mSum += val; + } else { + mSum -= mValues[0]; + memmove(&mValues[0], &mValues[1], (kHistorySize - 1) * sizeof(double)); + mValues[kHistorySize - 1] = val; + mSum += val; + } +} + +double TimeSeries::mean() const { + if (mCount < 1) { + return 0.0; + } + + return mSum / mCount; +} + +double TimeSeries::sdev() const { + if (mCount < 1) { + return 0.0; + } + + double m = mean(); + + double sum = 0.0; + for (size_t i = 0; i < mCount; ++i) { + double tmp = mValues[i] - m; + tmp *= tmp; + + sum += tmp; + } + + return sqrt(sum / mCount); +} +#endif // DEBUG_JITTER + +//////////////////////////////////////////////////////////////////////////////// + +static size_t kMaxRTPPacketSize = 1500; +static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188; + +Sender::Sender( + const sp &netSession, + const sp ¬ify) + : mNetSession(netSession), + mNotify(notify), + mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)), + mTransportMode(TRANSPORT_UDP), + mRTPChannel(0), + mRTCPChannel(0), + mRTPPort(0), + mRTPSessionID(0), + mRTCPSessionID(0), +#if ENABLE_RETRANSMISSION + mRTPRetransmissionSessionID(0), + mRTCPRetransmissionSessionID(0), +#endif + mClientRTPPort(0), + mClientRTCPPort(0), + mRTPConnected(false), + mRTCPConnected(false), + mFirstOutputBufferReadyTimeUs(-1ll), + mFirstOutputBufferSentTimeUs(-1ll), + mRTPSeqNo(0), +#if ENABLE_RETRANSMISSION + mRTPRetransmissionSeqNo(0), +#endif + mLastNTPTime(0), + mLastRTPTime(0), + mNumRTPSent(0), + mNumRTPOctetsSent(0), + mNumSRsSent(0), + mSendSRPending(false) +#if ENABLE_RETRANSMISSION + ,mHistoryLength(0) +#endif +#if TRACK_BANDWIDTH + ,mFirstPacketTimeUs(-1ll) + ,mTotalBytesSent(0ll) +#endif +#if LOG_TRANSPORT_STREAM + ,mLogFile(NULL) +#endif +{ + mTSQueue->setRange(0, 12); + +#if LOG_TRANSPORT_STREAM + mLogFile = fopen("/system/etc/log.ts", "wb"); +#endif +} + +Sender::~Sender() { +#if ENABLE_RETRANSMISSION + if (mRTCPRetransmissionSessionID != 0) { + mNetSession->destroySession(mRTCPRetransmissionSessionID); + } + + if (mRTPRetransmissionSessionID != 0) { + mNetSession->destroySession(mRTPRetransmissionSessionID); + } +#endif + + if (mRTCPSessionID != 0) { + mNetSession->destroySession(mRTCPSessionID); + } + + if (mRTPSessionID != 0) { + mNetSession->destroySession(mRTPSessionID); + } + +#if LOG_TRANSPORT_STREAM + if (mLogFile != NULL) { + fclose(mLogFile); + mLogFile = NULL; + } +#endif +} + +status_t Sender::init( + const char *clientIP, int32_t clientRtp, int32_t clientRtcp, + TransportMode transportMode) { + mClientIP = clientIP; + mTransportMode = transportMode; + + if (transportMode == TRANSPORT_TCP_INTERLEAVED) { + mRTPChannel = clientRtp; + mRTCPChannel = clientRtcp; + mRTPPort = 0; + mRTPSessionID = 0; + mRTCPSessionID = 0; + return OK; + } + + mRTPChannel = 0; + mRTCPChannel = 0; + + if (mTransportMode == TRANSPORT_TCP) { + // XXX This is wrong, we need to allocate sockets here, we only + // need to do this because the dongles are not establishing their + // end until after PLAY instead of before SETUP. + mRTPPort = 20000; + mRTPSessionID = 0; + mRTCPSessionID = 0; + mClientRTPPort = clientRtp; + mClientRTCPPort = clientRtcp; + return OK; + } + + int serverRtp; + + sp rtpNotify = new AMessage(kWhatRTPNotify, id()); + sp rtcpNotify = new AMessage(kWhatRTCPNotify, id()); + +#if ENABLE_RETRANSMISSION + sp rtpRetransmissionNotify = + new AMessage(kWhatRTPRetransmissionNotify, id()); + + sp rtcpRetransmissionNotify = + new AMessage(kWhatRTCPRetransmissionNotify, id()); +#endif + + status_t err; + for (serverRtp = 15550;; serverRtp += 2) { + int32_t rtpSession; + if (mTransportMode == TRANSPORT_UDP) { + err = mNetSession->createUDPSession( + serverRtp, clientIP, clientRtp, + rtpNotify, &rtpSession); + } else { + err = mNetSession->createTCPDatagramSession( + serverRtp, clientIP, clientRtp, + rtpNotify, &rtpSession); + } + + if (err != OK) { + ALOGI("failed to create RTP socket on port %d", serverRtp); + continue; + } + + int32_t rtcpSession = 0; + + if (clientRtcp >= 0) { + if (mTransportMode == TRANSPORT_UDP) { + err = mNetSession->createUDPSession( + serverRtp + 1, clientIP, clientRtcp, + rtcpNotify, &rtcpSession); + } else { + err = mNetSession->createTCPDatagramSession( + serverRtp + 1, clientIP, clientRtcp, + rtcpNotify, &rtcpSession); + } + + if (err != OK) { + ALOGI("failed to create RTCP socket on port %d", serverRtp + 1); + + mNetSession->destroySession(rtpSession); + continue; + } + } + +#if ENABLE_RETRANSMISSION + if (mTransportMode == TRANSPORT_UDP) { + int32_t rtpRetransmissionSession; + + err = mNetSession->createUDPSession( + serverRtp + kRetransmissionPortOffset, + clientIP, + clientRtp + kRetransmissionPortOffset, + rtpRetransmissionNotify, + &rtpRetransmissionSession); + + if (err != OK) { + mNetSession->destroySession(rtcpSession); + mNetSession->destroySession(rtpSession); + continue; + } + + CHECK_GE(clientRtcp, 0); + + int32_t rtcpRetransmissionSession; + err = mNetSession->createUDPSession( + serverRtp + 1 + kRetransmissionPortOffset, + clientIP, + clientRtp + 1 + kRetransmissionPortOffset, + rtcpRetransmissionNotify, + &rtcpRetransmissionSession); + + if (err != OK) { + mNetSession->destroySession(rtpRetransmissionSession); + mNetSession->destroySession(rtcpSession); + mNetSession->destroySession(rtpSession); + continue; + } + + mRTPRetransmissionSessionID = rtpRetransmissionSession; + mRTCPRetransmissionSessionID = rtcpRetransmissionSession; + + ALOGI("rtpRetransmissionSessionID = %d, " + "rtcpRetransmissionSessionID = %d", + rtpRetransmissionSession, rtcpRetransmissionSession); + } +#endif + + mRTPPort = serverRtp; + mRTPSessionID = rtpSession; + mRTCPSessionID = rtcpSession; + + ALOGI("rtpSessionID = %d, rtcpSessionID = %d", rtpSession, rtcpSession); + break; + } + + if (mRTPPort == 0) { + return UNKNOWN_ERROR; + } + + return OK; +} + +status_t Sender::finishInit() { + if (mTransportMode != TRANSPORT_TCP) { + notifyInitDone(); + return OK; + } + + sp rtpNotify = new AMessage(kWhatRTPNotify, id()); + + status_t err = mNetSession->createTCPDatagramSession( + mRTPPort, mClientIP.c_str(), mClientRTPPort, + rtpNotify, &mRTPSessionID); + + if (err != OK) { + return err; + } + + if (mClientRTCPPort >= 0) { + sp rtcpNotify = new AMessage(kWhatRTCPNotify, id()); + + err = mNetSession->createTCPDatagramSession( + mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort, + rtcpNotify, &mRTCPSessionID); + + if (err != OK) { + return err; + } + } + + return OK; +} + +int32_t Sender::getRTPPort() const { + return mRTPPort; +} + +void Sender::queuePackets( + int64_t timeUs, const sp &packets) { + bool isVideo = false; + + int32_t dummy; + if (packets->meta()->findInt32("isVideo", &dummy)) { + isVideo = true; + } + + int64_t delayUs; + int64_t whenUs; + + if (mFirstOutputBufferReadyTimeUs < 0ll) { + mFirstOutputBufferReadyTimeUs = timeUs; + mFirstOutputBufferSentTimeUs = whenUs = ALooper::GetNowUs(); + delayUs = 0ll; + } else { + int64_t nowUs = ALooper::GetNowUs(); + + whenUs = (timeUs - mFirstOutputBufferReadyTimeUs) + + mFirstOutputBufferSentTimeUs; + + delayUs = whenUs - nowUs; + } + + sp msg = new AMessage(kWhatQueuePackets, id()); + msg->setBuffer("packets", packets); + + packets->meta()->setInt64("timeUs", timeUs); + packets->meta()->setInt64("whenUs", whenUs); + packets->meta()->setInt64("delayUs", delayUs); + msg->post(delayUs > 0 ? delayUs : 0); +} + +void Sender::onMessageReceived(const sp &msg) { + switch (msg->what()) { + case kWhatRTPNotify: + case kWhatRTCPNotify: +#if ENABLE_RETRANSMISSION + case kWhatRTPRetransmissionNotify: + case kWhatRTCPRetransmissionNotify: +#endif + { + int32_t reason; + CHECK(msg->findInt32("reason", &reason)); + + switch (reason) { + case ANetworkSession::kWhatError: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + int32_t err; + CHECK(msg->findInt32("err", &err)); + + int32_t errorOccuredDuringSend; + CHECK(msg->findInt32("send", &errorOccuredDuringSend)); + + AString detail; + CHECK(msg->findString("detail", &detail)); + + if ((msg->what() == kWhatRTPNotify +#if ENABLE_RETRANSMISSION + || msg->what() == kWhatRTPRetransmissionNotify +#endif + ) && !errorOccuredDuringSend) { + // This is ok, we don't expect to receive anything on + // the RTP socket. + break; + } + + ALOGE("An error occurred during %s in session %d " + "(%d, '%s' (%s)).", + errorOccuredDuringSend ? "send" : "receive", + sessionID, + err, + detail.c_str(), + strerror(-err)); + + mNetSession->destroySession(sessionID); + + if (sessionID == mRTPSessionID) { + mRTPSessionID = 0; + } else if (sessionID == mRTCPSessionID) { + mRTCPSessionID = 0; + } +#if ENABLE_RETRANSMISSION + else if (sessionID == mRTPRetransmissionSessionID) { + mRTPRetransmissionSessionID = 0; + } else if (sessionID == mRTCPRetransmissionSessionID) { + mRTCPRetransmissionSessionID = 0; + } +#endif + + notifySessionDead(); + break; + } + + case ANetworkSession::kWhatDatagram: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + sp data; + CHECK(msg->findBuffer("data", &data)); + + status_t err; + if (msg->what() == kWhatRTCPNotify +#if ENABLE_RETRANSMISSION + || msg->what() == kWhatRTCPRetransmissionNotify +#endif + ) + { + err = parseRTCP(data); + } + break; + } + + case ANetworkSession::kWhatConnected: + { + CHECK_EQ(mTransportMode, TRANSPORT_TCP); + + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + if (sessionID == mRTPSessionID) { + CHECK(!mRTPConnected); + mRTPConnected = true; + ALOGI("RTP Session now connected."); + } else if (sessionID == mRTCPSessionID) { + CHECK(!mRTCPConnected); + mRTCPConnected = true; + ALOGI("RTCP Session now connected."); + } else { + TRESPASS(); + } + + if (mRTPConnected + && (mClientRTCPPort < 0 || mRTCPConnected)) { + notifyInitDone(); + } + break; + } + + default: + TRESPASS(); + } + break; + } + + case kWhatQueuePackets: + { + sp packets; + CHECK(msg->findBuffer("packets", &packets)); + + onQueuePackets(packets); + break; + } + + case kWhatSendSR: + { + mSendSRPending = false; + + if (mRTCPSessionID == 0) { + break; + } + + onSendSR(); + + scheduleSendSR(); + break; + } + } +} + +void Sender::onQueuePackets(const sp &packets) { +#if DEBUG_JITTER + int32_t dummy; + if (packets->meta()->findInt32("isVideo", &dummy)) { + static int64_t lastTimeUs = 0ll; + int64_t nowUs = ALooper::GetNowUs(); + + static TimeSeries series; + series.add((double)(nowUs - lastTimeUs)); + + ALOGI("deltaTimeUs = %lld us, mean %.2f, sdev %.2f", + nowUs - lastTimeUs, series.mean(), series.sdev()); + + lastTimeUs = nowUs; + } +#endif + + int64_t startTimeUs = ALooper::GetNowUs(); + + for (size_t offset = 0; + offset < packets->size(); offset += 188) { + bool lastTSPacket = (offset + 188 >= packets->size()); + + appendTSData( + packets->data() + offset, + 188, + true /* timeDiscontinuity */, + lastTSPacket /* flush */); + } + +#if 0 + int64_t netTimeUs = ALooper::GetNowUs() - startTimeUs; + + int64_t whenUs; + CHECK(packets->meta()->findInt64("whenUs", &whenUs)); + + int64_t delayUs; + CHECK(packets->meta()->findInt64("delayUs", &delayUs)); + + bool isVideo = false; + int32_t dummy; + if (packets->meta()->findInt32("isVideo", &dummy)) { + isVideo = true; + } + + int64_t nowUs = ALooper::GetNowUs(); + + if (nowUs - whenUs > 2000) { + ALOGI("[%s] delayUs = %lld us, delta = %lld us", + isVideo ? "video" : "audio", delayUs, nowUs - netTimeUs - whenUs); + } +#endif + +#if LOG_TRANSPORT_STREAM + if (mLogFile != NULL) { + fwrite(packets->data(), 1, packets->size(), mLogFile); + } +#endif +} + +ssize_t Sender::appendTSData( + const void *data, size_t size, bool timeDiscontinuity, bool flush) { + CHECK_EQ(size, 188); + + CHECK_LE(mTSQueue->size() + size, mTSQueue->capacity()); + + memcpy(mTSQueue->data() + mTSQueue->size(), data, size); + mTSQueue->setRange(0, mTSQueue->size() + size); + + if (flush || mTSQueue->size() == mTSQueue->capacity()) { + // flush + + int64_t nowUs = ALooper::GetNowUs(); + +#if TRACK_BANDWIDTH + if (mFirstPacketTimeUs < 0ll) { + mFirstPacketTimeUs = nowUs; + } +#endif + + // 90kHz time scale + uint32_t rtpTime = (nowUs * 9ll) / 100ll; + + uint8_t *rtp = mTSQueue->data(); + rtp[0] = 0x80; + rtp[1] = 33 | (timeDiscontinuity ? (1 << 7) : 0); // M-bit + rtp[2] = (mRTPSeqNo >> 8) & 0xff; + rtp[3] = mRTPSeqNo & 0xff; + rtp[4] = rtpTime >> 24; + rtp[5] = (rtpTime >> 16) & 0xff; + rtp[6] = (rtpTime >> 8) & 0xff; + rtp[7] = rtpTime & 0xff; + rtp[8] = kSourceID >> 24; + rtp[9] = (kSourceID >> 16) & 0xff; + rtp[10] = (kSourceID >> 8) & 0xff; + rtp[11] = kSourceID & 0xff; + + ++mRTPSeqNo; + ++mNumRTPSent; + mNumRTPOctetsSent += mTSQueue->size() - 12; + + mLastRTPTime = rtpTime; + mLastNTPTime = GetNowNTP(); + + if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBinaryData); + + sp data = new ABuffer(mTSQueue->size()); + memcpy(data->data(), rtp, mTSQueue->size()); + + notify->setInt32("channel", mRTPChannel); + notify->setBuffer("data", data); + notify->post(); + } else { + sendPacket(mRTPSessionID, rtp, mTSQueue->size()); + +#if TRACK_BANDWIDTH + mTotalBytesSent += mTSQueue->size(); + int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs; + + if (delayUs > 0ll) { + ALOGI("approx. net bandwidth used: %.2f Mbit/sec", + mTotalBytesSent * 8.0 / delayUs); + } +#endif + } + +#if ENABLE_RETRANSMISSION + mTSQueue->setInt32Data(mRTPSeqNo - 1); + + mHistory.push_back(mTSQueue); + ++mHistoryLength; + + if (mHistoryLength > kMaxHistoryLength) { + mTSQueue = *mHistory.begin(); + mHistory.erase(mHistory.begin()); + + --mHistoryLength; + } else { + mTSQueue = new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); + } +#endif + + mTSQueue->setRange(0, 12); + } + + return size; +} + +void Sender::scheduleSendSR() { + if (mSendSRPending || mRTCPSessionID == 0) { + return; + } + + mSendSRPending = true; + (new AMessage(kWhatSendSR, id()))->post(kSendSRIntervalUs); +} + +void Sender::addSR(const sp &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + + // TODO: Use macros/utility functions to clean up all the bitshifts below. + + data[0] = 0x80 | 0; + data[1] = 200; // SR + data[2] = 0; + data[3] = 6; + data[4] = kSourceID >> 24; + data[5] = (kSourceID >> 16) & 0xff; + data[6] = (kSourceID >> 8) & 0xff; + data[7] = kSourceID & 0xff; + + data[8] = mLastNTPTime >> (64 - 8); + data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; + data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; + data[11] = (mLastNTPTime >> 32) & 0xff; + data[12] = (mLastNTPTime >> 24) & 0xff; + data[13] = (mLastNTPTime >> 16) & 0xff; + data[14] = (mLastNTPTime >> 8) & 0xff; + data[15] = mLastNTPTime & 0xff; + + data[16] = (mLastRTPTime >> 24) & 0xff; + data[17] = (mLastRTPTime >> 16) & 0xff; + data[18] = (mLastRTPTime >> 8) & 0xff; + data[19] = mLastRTPTime & 0xff; + + data[20] = mNumRTPSent >> 24; + data[21] = (mNumRTPSent >> 16) & 0xff; + data[22] = (mNumRTPSent >> 8) & 0xff; + data[23] = mNumRTPSent & 0xff; + + data[24] = mNumRTPOctetsSent >> 24; + data[25] = (mNumRTPOctetsSent >> 16) & 0xff; + data[26] = (mNumRTPOctetsSent >> 8) & 0xff; + data[27] = mNumRTPOctetsSent & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + 28); +} + +void Sender::addSDES(const sp &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = kSourceID >> 24; + data[5] = (kSourceID >> 16) & 0xff; + data[6] = (kSourceID >> 8) & 0xff; + data[7] = kSourceID & 0xff; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + static const char *kCNAME = "someone@somewhere"; + data[offset++] = strlen(kCNAME); + + memcpy(&data[offset], kCNAME, strlen(kCNAME)); + offset += strlen(kCNAME); + + data[offset++] = 7; // NOTE + + static const char *kNOTE = "Hell's frozen over."; + data[offset++] = strlen(kNOTE); + + memcpy(&data[offset], kNOTE, strlen(kNOTE)); + offset += strlen(kNOTE); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); +} + +// static +uint64_t Sender::GetNowNTP() { + uint64_t nowUs = ALooper::GetNowUs(); + + nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; + + uint64_t hi = nowUs / 1000000ll; + uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; + + return (hi << 32) | lo; +} + +void Sender::onSendSR() { + sp buffer = new ABuffer(1500); + buffer->setRange(0, 0); + + addSR(buffer); + addSDES(buffer); + + if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBinaryData); + notify->setInt32("channel", mRTCPChannel); + notify->setBuffer("data", buffer); + notify->post(); + } else { + sendPacket(mRTCPSessionID, buffer->data(), buffer->size()); + } + + ++mNumSRsSent; +} + +#if ENABLE_RETRANSMISSION +status_t Sender::parseTSFB( + const uint8_t *data, size_t size) { + if ((data[0] & 0x1f) != 1) { + return ERROR_UNSUPPORTED; // We only support NACK for now. + } + + uint32_t srcId = U32_AT(&data[8]); + if (srcId != kSourceID) { + return ERROR_MALFORMED; + } + + for (size_t i = 12; i < size; i += 4) { + uint16_t seqNo = U16_AT(&data[i]); + uint16_t blp = U16_AT(&data[i + 2]); + + List >::iterator it = mHistory.begin(); + bool foundSeqNo = false; + while (it != mHistory.end()) { + const sp &buffer = *it; + + uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; + + bool retransmit = false; + if (bufferSeqNo == seqNo) { + retransmit = true; + } else if (blp != 0) { + for (size_t i = 0; i < 16; ++i) { + if ((blp & (1 << i)) + && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { + blp &= ~(1 << i); + retransmit = true; + } + } + } + + if (retransmit) { + ALOGI("retransmitting seqNo %d", bufferSeqNo); + + sp retransRTP = new ABuffer(2 + buffer->size()); + uint8_t *rtp = retransRTP->data(); + memcpy(rtp, buffer->data(), 12); + rtp[2] = (mRTPRetransmissionSeqNo >> 8) & 0xff; + rtp[3] = mRTPRetransmissionSeqNo & 0xff; + rtp[12] = (bufferSeqNo >> 8) & 0xff; + rtp[13] = bufferSeqNo & 0xff; + memcpy(&rtp[14], buffer->data() + 12, buffer->size() - 12); + + ++mRTPRetransmissionSeqNo; + + sendPacket( + mRTPRetransmissionSessionID, + retransRTP->data(), retransRTP->size()); + + if (bufferSeqNo == seqNo) { + foundSeqNo = true; + } + + if (foundSeqNo && blp == 0) { + break; + } + } + + ++it; + } + + if (!foundSeqNo || blp != 0) { + ALOGI("Some sequence numbers were no longer available for " + "retransmission"); + } + } + + return OK; +} +#endif + +status_t Sender::parseRTCP( + const sp &buffer) { + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + while (size > 0) { + if (size < 8) { + // Too short to be a valid RTCP header + return ERROR_MALFORMED; + } + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return ERROR_UNSUPPORTED; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return ERROR_MALFORMED; + } + + size -= paddingLength; + } + + size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; + + if (size < headerLength) { + // Only received a partial packet? + return ERROR_MALFORMED; + } + + switch (data[1]) { + case 200: + case 201: // RR + case 202: // SDES + case 203: + case 204: // APP + break; + +#if ENABLE_RETRANSMISSION + case 205: // TSFB (transport layer specific feedback) + parseTSFB(data, headerLength); + break; +#endif + + case 206: // PSFB (payload specific feedback) + hexdump(data, headerLength); + break; + + default: + { + ALOGW("Unknown RTCP packet type %u of size %d", + (unsigned)data[1], headerLength); + break; + } + } + + data += headerLength; + size -= headerLength; + } + + return OK; +} + +status_t Sender::sendPacket( + int32_t sessionID, const void *data, size_t size) { + return mNetSession->sendRequest(sessionID, data, size); +} + +void Sender::notifyInitDone() { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatInitDone); + notify->post(); +} + +void Sender::notifySessionDead() { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatSessionDead); + notify->post(); +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/source/Sender.h b/media/libstagefright/wifi-display/source/Sender.h new file mode 100644 index 0000000000..e476e84651 --- /dev/null +++ b/media/libstagefright/wifi-display/source/Sender.h @@ -0,0 +1,166 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SENDER_H_ + +#define SENDER_H_ + +#include + +namespace android { + +#define LOG_TRANSPORT_STREAM 0 +#define ENABLE_RETRANSMISSION 0 +#define TRACK_BANDWIDTH 0 + +struct ABuffer; +struct ANetworkSession; + +struct Sender : public AHandler { + Sender(const sp &netSession, const sp ¬ify); + + enum { + kWhatInitDone, + kWhatSessionDead, + kWhatBinaryData, + }; + + enum TransportMode { + TRANSPORT_UDP, + TRANSPORT_TCP_INTERLEAVED, + TRANSPORT_TCP, + }; + status_t init( + const char *clientIP, int32_t clientRtp, int32_t clientRtcp, + TransportMode transportMode); + + status_t finishInit(); + + int32_t getRTPPort() const; + + void queuePackets(int64_t timeUs, const sp &packets); + void scheduleSendSR(); + +protected: + virtual ~Sender(); + virtual void onMessageReceived(const sp &msg); + +private: + enum { + kWhatQueuePackets, + kWhatSendSR, + kWhatRTPNotify, + kWhatRTCPNotify, +#if ENABLE_RETRANSMISSION + kWhatRTPRetransmissionNotify, + kWhatRTCPRetransmissionNotify +#endif + }; + + static const int64_t kSendSRIntervalUs = 10000000ll; + + static const uint32_t kSourceID = 0xdeadbeef; + static const size_t kMaxHistoryLength = 128; + +#if ENABLE_RETRANSMISSION + static const size_t kRetransmissionPortOffset = 120; +#endif + + sp mNetSession; + sp mNotify; + + sp mTSQueue; + + TransportMode mTransportMode; + AString mClientIP; + + // in TCP mode + int32_t mRTPChannel; + int32_t mRTCPChannel; + + // in UDP mode + int32_t mRTPPort; + int32_t mRTPSessionID; + int32_t mRTCPSessionID; + +#if ENABLE_RETRANSMISSION + int32_t mRTPRetransmissionSessionID; + int32_t mRTCPRetransmissionSessionID; +#endif + + int32_t mClientRTPPort; + int32_t mClientRTCPPort; + bool mRTPConnected; + bool mRTCPConnected; + + + int64_t mFirstOutputBufferReadyTimeUs; + int64_t mFirstOutputBufferSentTimeUs; + + uint32_t mRTPSeqNo; +#if ENABLE_RETRANSMISSION + uint32_t mRTPRetransmissionSeqNo; +#endif + + uint64_t mLastNTPTime; + uint32_t mLastRTPTime; + uint32_t mNumRTPSent; + uint32_t mNumRTPOctetsSent; + uint32_t mNumSRsSent; + + bool mSendSRPending; + +#if ENABLE_RETRANSMISSION + List > mHistory; + size_t mHistoryLength; +#endif + +#if TRACK_BANDWIDTH + int64_t mFirstPacketTimeUs; + uint64_t mTotalBytesSent; +#endif + + void onSendSR(); + void addSR(const sp &buffer); + void addSDES(const sp &buffer); + static uint64_t GetNowNTP(); + +#if LOG_TRANSPORT_STREAM + FILE *mLogFile; +#endif + + ssize_t appendTSData( + const void *data, size_t size, bool timeDiscontinuity, bool flush); + + void onQueuePackets(const sp &packets); + +#if ENABLE_RETRANSMISSION + status_t parseTSFB(const uint8_t *data, size_t size); +#endif + + status_t parseRTCP(const sp &buffer); + + status_t sendPacket(int32_t sessionID, const void *data, size_t size); + + void notifyInitDone(); + void notifySessionDead(); + + DISALLOW_EVIL_CONSTRUCTORS(Sender); +}; + +} // namespace android + +#endif // SENDER_H_ diff --git a/media/libstagefright/wifi-display/source/TSPacketizer.cpp b/media/libstagefright/wifi-display/source/TSPacketizer.cpp index 7e660724d3..a5679ad907 100644 --- a/media/libstagefright/wifi-display/source/TSPacketizer.cpp +++ b/media/libstagefright/wifi-display/source/TSPacketizer.cpp @@ -49,10 +49,16 @@ struct TSPacketizer::Track : public RefBase { bool isH264() const; bool isAAC() const; bool lacksADTSHeader() const; + bool isPCMAudio() const; sp prependCSD(const sp &accessUnit) const; sp prependADTSHeader(const sp &accessUnit) const; + size_t countDescriptors() const; + sp descriptorAt(size_t index) const; + + void finalize(); + protected: virtual ~Track(); @@ -67,7 +73,10 @@ private: AString mMIME; Vector > mCSD; + Vector > mDescriptors; + bool mAudioLacksATDSHeaders; + bool mFinalized; DISALLOW_EVIL_CONSTRUCTORS(Track); }; @@ -80,7 +89,8 @@ TSPacketizer::Track::Track( mStreamType(streamType), mStreamID(streamID), mContinuityCounter(0), - mAudioLacksATDSHeaders(false) { + mAudioLacksATDSHeaders(false), + mFinalized(false) { CHECK(format->findString("mime", &mMIME)); if (!strcasecmp(mMIME.c_str(), MEDIA_MIMETYPE_VIDEO_AVC) @@ -144,6 +154,10 @@ bool TSPacketizer::Track::isAAC() const { return !strcasecmp(mMIME.c_str(), MEDIA_MIMETYPE_AUDIO_AAC); } +bool TSPacketizer::Track::isPCMAudio() const { + return !strcasecmp(mMIME.c_str(), MEDIA_MIMETYPE_AUDIO_RAW); +} + bool TSPacketizer::Track::lacksADTSHeader() const { return mAudioLacksATDSHeaders; } @@ -213,6 +227,96 @@ sp TSPacketizer::Track::prependADTSHeader( return dup; } +size_t TSPacketizer::Track::countDescriptors() const { + return mDescriptors.size(); +} + +sp TSPacketizer::Track::descriptorAt(size_t index) const { + CHECK_LT(index, mDescriptors.size()); + return mDescriptors.itemAt(index); +} + +void TSPacketizer::Track::finalize() { + if (mFinalized) { + return; + } + + if (isH264()) { + { + // AVC video descriptor (40) + + sp descriptor = new ABuffer(6); + uint8_t *data = descriptor->data(); + data[0] = 40; // descriptor_tag + data[1] = 4; // descriptor_length + + CHECK_EQ(mCSD.size(), 1u); + const sp &sps = mCSD.itemAt(0); + CHECK(!memcmp("\x00\x00\x00\x01", sps->data(), 4)); + CHECK_GE(sps->size(), 7u); + // profile_idc, constraint_set*, level_idc + memcpy(&data[2], sps->data() + 4, 3); + + // AVC_still_present=0, AVC_24_hour_picture_flag=0, reserved + data[5] = 0x3f; + + mDescriptors.push_back(descriptor); + } + + { + // AVC timing and HRD descriptor (42) + + sp descriptor = new ABuffer(4); + uint8_t *data = descriptor->data(); + data[0] = 42; // descriptor_tag + data[1] = 2; // descriptor_length + + // hrd_management_valid_flag = 0 + // reserved = 111111b + // picture_and_timing_info_present = 0 + + data[2] = 0x7e; + + // fixed_frame_rate_flag = 0 + // temporal_poc_flag = 0 + // picture_to_display_conversion_flag = 0 + // reserved = 11111b + data[3] = 0x1f; + + mDescriptors.push_back(descriptor); + } + } else if (isPCMAudio()) { + // LPCM audio stream descriptor (0x83) + + int32_t channelCount; + CHECK(mFormat->findInt32("channel-count", &channelCount)); + CHECK_EQ(channelCount, 2); + + int32_t sampleRate; + CHECK(mFormat->findInt32("sample-rate", &sampleRate)); + CHECK(sampleRate == 44100 || sampleRate == 48000); + + sp descriptor = new ABuffer(4); + uint8_t *data = descriptor->data(); + data[0] = 0x83; // descriptor_tag + data[1] = 2; // descriptor_length + + unsigned sampling_frequency = (sampleRate == 44100) ? 1 : 2; + + data[2] = (sampling_frequency << 5) + | (3 /* reserved */ << 1) + | 0 /* emphasis_flag */; + + data[3] = + (1 /* number_of_channels = stereo */ << 5) + | 0xf /* reserved */; + + mDescriptors.push_back(descriptor); + } + + mFinalized = true; +} + //////////////////////////////////////////////////////////////////////////////// TSPacketizer::TSPacketizer() @@ -289,7 +393,8 @@ status_t TSPacketizer::packetize( const sp &_accessUnit, sp *packets, uint32_t flags, - const uint8_t *PES_private_data, size_t PES_private_data_len) { + const uint8_t *PES_private_data, size_t PES_private_data_len, + size_t numStuffingBytes) { sp accessUnit = _accessUnit; int64_t timeUs; @@ -347,7 +452,7 @@ status_t TSPacketizer::packetize( // reserved = b1 // the first fragment of "buffer" follows - size_t PES_packet_length = accessUnit->size() + 8; + size_t PES_packet_length = accessUnit->size() + 8 + numStuffingBytes; if (PES_private_data_len > 0) { PES_packet_length += PES_private_data_len + 1; } @@ -410,7 +515,7 @@ status_t TSPacketizer::packetize( *ptr++ = 0x10 | mPATContinuityCounter; *ptr++ = 0x00; - const uint8_t *crcDataStart = ptr; + uint8_t *crcDataStart = ptr; *ptr++ = 0x00; *ptr++ = 0xb0; *ptr++ = 0x0d; @@ -472,8 +577,6 @@ status_t TSPacketizer::packetize( mPMTContinuityCounter = 0; } - size_t section_length = 5 * mTracks.size() + 4 + 9; - ptr = packetDataStart; *ptr++ = 0x47; *ptr++ = 0x40 | (kPID_PMT >> 8); @@ -483,8 +586,10 @@ status_t TSPacketizer::packetize( crcDataStart = ptr; *ptr++ = 0x02; - *ptr++ = 0xb0 | (section_length >> 8); - *ptr++ = section_length & 0xff; + + *ptr++ = 0x00; // section_length to be filled in below. + *ptr++ = 0x00; + *ptr++ = 0x00; *ptr++ = 0x01; *ptr++ = 0xc3; @@ -498,14 +603,34 @@ status_t TSPacketizer::packetize( for (size_t i = 0; i < mTracks.size(); ++i) { const sp &track = mTracks.itemAt(i); + // Make sure all the decriptors have been added. + track->finalize(); + *ptr++ = track->streamType(); *ptr++ = 0xe0 | (track->PID() >> 8); *ptr++ = track->PID() & 0xff; - *ptr++ = 0xf0; - *ptr++ = 0x00; + + size_t ES_info_length = 0; + for (size_t i = 0; i < track->countDescriptors(); ++i) { + ES_info_length += track->descriptorAt(i)->size(); + } + CHECK_LE(ES_info_length, 0xfff); + + *ptr++ = 0xf0 | (ES_info_length >> 8); + *ptr++ = (ES_info_length & 0xff); + + for (size_t i = 0; i < track->countDescriptors(); ++i) { + const sp &descriptor = track->descriptorAt(i); + memcpy(ptr, descriptor->data(), descriptor->size()); + ptr += descriptor->size(); + } } - CHECK_EQ(ptr - crcDataStart, 12 + mTracks.size() * 5); + size_t section_length = ptr - (crcDataStart + 3) + 4 /* CRC */; + + crcDataStart[1] = 0xb0 | (section_length >> 8); + crcDataStart[2] = section_length & 0xff; + crc = htonl(crc32(crcDataStart, ptr - crcDataStart)); memcpy(ptr, &crc, 4); ptr += 4; @@ -601,8 +726,12 @@ status_t TSPacketizer::packetize( *ptr++ = 0x84; *ptr++ = (PES_private_data_len > 0) ? 0x81 : 0x80; - *ptr++ = (PES_private_data_len > 0) - ? (1 + PES_private_data_len + 0x05) : 0x05; + size_t headerLength = 0x05 + numStuffingBytes; + if (PES_private_data_len > 0) { + headerLength += 1 + PES_private_data_len; + } + + *ptr++ = headerLength; *ptr++ = 0x20 | (((PTS >> 30) & 7) << 1) | 1; *ptr++ = (PTS >> 22) & 0xff; @@ -616,6 +745,10 @@ status_t TSPacketizer::packetize( ptr += PES_private_data_len; } + for (size_t i = 0; i < numStuffingBytes; ++i) { + *ptr++ = 0xff; + } + // 18 bytes of TS/PES header leave 188 - 18 = 170 bytes for the payload size_t sizeLeft = packetDataStart + 188 - ptr; diff --git a/media/libstagefright/wifi-display/source/TSPacketizer.h b/media/libstagefright/wifi-display/source/TSPacketizer.h index 0733c067d9..a37917dd63 100644 --- a/media/libstagefright/wifi-display/source/TSPacketizer.h +++ b/media/libstagefright/wifi-display/source/TSPacketizer.h @@ -47,7 +47,8 @@ struct TSPacketizer : public RefBase { size_t trackIndex, const sp &accessUnit, sp *packets, uint32_t flags, - const uint8_t *PES_private_data, size_t PES_private_data_len); + const uint8_t *PES_private_data, size_t PES_private_data_len, + size_t numStuffingBytes = 0); // XXX to be removed once encoder config option takes care of this for // encrypted mode. diff --git a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp index cba1043f22..b16c5d02ca 100644 --- a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp +++ b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp @@ -22,6 +22,7 @@ #include "PlaybackSession.h" #include "Parameters.h" #include "ParsedMessage.h" +#include "Sender.h" #include #include @@ -981,8 +982,7 @@ status_t WifiDisplaySource::onSetupRequest( return ERROR_MALFORMED; } - PlaybackSession::TransportMode transportMode = - PlaybackSession::TRANSPORT_UDP; + Sender::TransportMode transportMode = Sender::TRANSPORT_UDP; int clientRtp, clientRtcp; if (transport.startsWith("RTP/AVP/TCP;")) { @@ -991,7 +991,7 @@ status_t WifiDisplaySource::onSetupRequest( transport.c_str(), "interleaved", &interleaved) && sscanf(interleaved.c_str(), "%d-%d", &clientRtp, &clientRtcp) == 2) { - transportMode = PlaybackSession::TRANSPORT_TCP_INTERLEAVED; + transportMode = Sender::TRANSPORT_TCP_INTERLEAVED; } else { bool badRequest = false; @@ -1013,7 +1013,7 @@ status_t WifiDisplaySource::onSetupRequest( return ERROR_MALFORMED; } - transportMode = PlaybackSession::TRANSPORT_TCP; + transportMode = Sender::TRANSPORT_TCP; } } else if (transport.startsWith("RTP/AVP;unicast;") || transport.startsWith("RTP/AVP/UDP;unicast;")) { @@ -1101,7 +1101,7 @@ status_t WifiDisplaySource::onSetupRequest( AString response = "RTSP/1.0 200 OK\r\n"; AppendCommonResponse(&response, cseq, playbackSessionID); - if (transportMode == PlaybackSession::TRANSPORT_TCP_INTERLEAVED) { + if (transportMode == Sender::TRANSPORT_TCP_INTERLEAVED) { response.append( StringPrintf( "Transport: RTP/AVP/TCP;interleaved=%d-%d;", @@ -1110,7 +1110,7 @@ status_t WifiDisplaySource::onSetupRequest( int32_t serverRtp = playbackSession->getRTPPort(); AString transportString = "UDP"; - if (transportMode == PlaybackSession::TRANSPORT_TCP) { + if (transportMode == Sender::TRANSPORT_TCP) { transportString = "TCP"; } -- 2.11.0