OSDN Git Service

Squashed commit of the following:
authorAndreas Huber <andih@google.com>
Tue, 12 Mar 2013 18:01:43 +0000 (11:01 -0700)
committerAndreas Huber <andih@google.com>
Tue, 12 Mar 2013 20:44:58 +0000 (13:44 -0700)
commit f4edf442741886cdbe071e2d15f6e6247269f7c5
Author: Andreas Huber <andih@google.com>
Date:   Tue Mar 12 09:09:18 2013 -0700

    Pass additional flags to the sink, use TCP by default in wolfiecast mode.

    Change-Id: I41e11a2375d4199656e45c4f149d8441d0016092

commit 6302602ed280a38287f507159abfb40a1da38c5a
Author: Andreas Huber <andih@google.com>
Date:   Tue Mar 12 08:51:58 2013 -0700

    tweaks

    Change-Id: Ie29e422d7258be522f4bb1f6c5afcf74c937e547

commit a38a860e4979ba563cadbaafa21b084439449d26
Author: Andreas Huber <andih@google.com>
Date:   Mon Mar 11 16:57:43 2013 -0700

    Report average lateness all the way from NuPlayerRenderer...

    Change-Id: I2e7700703ae656515e44b9c25610d26c75778111

commit a7d49b11675ea88be4029dd8451d1649db94571d
Author: Andreas Huber <andih@google.com>
Date:   Mon Mar 11 14:54:19 2013 -0700

    Make TimeSyncer smarter, enable TunnelRenderer

    Change-Id: I27377a60cd8feb01589da456967fddd34532c20e

commit 0f214c8ef68179f7b61512c37040939554013151
Author: Andreas Huber <andih@google.com>
Date:   Thu Mar 7 15:57:56 2013 -0800

    convert source timestamps to sink timestamps, report lateness.

    Change-Id: I051a60fbbceca2f7b508ae3dac6e01e402bae39e

commit 04a4f8e16bad09157b5615a5fa45310438955832
Author: Andreas Huber <andih@google.com>
Date:   Thu Mar 7 09:00:28 2013 -0800

    Sync time between sink and source.

    Change-Id: Ie8b4d75c957aa48310e7c81d1279761b9f821efe

commit aebe20e6184e3636a99082f8ece08e708015cb8d
Author: Andreas Huber <andih@google.com>
Date:   Wed Mar 6 09:03:12 2013 -0800

    play with back pressure

    Change-Id: I51eb69257e6a79e76f5f9c75ff99d8adbd083947

Change-Id: Ifdf57228667fed7fc71c5090a2c3f7cea1037c5c

30 files changed:
include/media/IStreamSource.h
media/libmediaplayerservice/nuplayer/NuPlayer.cpp
media/libmediaplayerservice/nuplayer/NuPlayerDriver.cpp
media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp
media/libmediaplayerservice/nuplayer/NuPlayerRenderer.h
media/libmediaplayerservice/nuplayer/NuPlayerSource.h
media/libmediaplayerservice/nuplayer/StreamingSource.cpp
media/libmediaplayerservice/nuplayer/StreamingSource.h
media/libstagefright/mpeg2ts/ATSParser.cpp
media/libstagefright/mpeg2ts/ATSParser.h
media/libstagefright/wifi-display/ANetworkSession.cpp
media/libstagefright/wifi-display/Android.mk
media/libstagefright/wifi-display/MediaReceiver.cpp
media/libstagefright/wifi-display/MediaReceiver.h
media/libstagefright/wifi-display/TimeSyncer.cpp [new file with mode: 0644]
media/libstagefright/wifi-display/TimeSyncer.h [new file with mode: 0644]
media/libstagefright/wifi-display/rtp/RTPAssembler.cpp
media/libstagefright/wifi-display/rtp/RTPReceiver.cpp
media/libstagefright/wifi-display/rtp/RTPReceiver.h
media/libstagefright/wifi-display/rtp/RTPSender.cpp
media/libstagefright/wifi-display/sink/DirectRenderer.cpp
media/libstagefright/wifi-display/sink/DirectRenderer.h
media/libstagefright/wifi-display/sink/TunnelRenderer.cpp
media/libstagefright/wifi-display/sink/TunnelRenderer.h
media/libstagefright/wifi-display/sink/WifiDisplaySink.cpp
media/libstagefright/wifi-display/sink/WifiDisplaySink.h
media/libstagefright/wifi-display/source/WifiDisplaySource.cpp
media/libstagefright/wifi-display/source/WifiDisplaySource.h
media/libstagefright/wifi-display/udptest.cpp
media/libstagefright/wifi-display/wfd.cpp

index 39e0a9e..677119b 100644 (file)
@@ -37,6 +37,9 @@ struct IStreamSource : public IInterface {
     enum {
         // Video PES packets contain exactly one (aligned) access unit.
         kFlagAlignedVideoData = 1,
+
+        // Timestamps are in ALooper::GetNowUs() units.
+        kFlagIsRealTimeData   = 2,
     };
     virtual uint32_t flags() const { return 0; }
 };
index 2ba6c22..5387e1a 100644 (file)
@@ -381,9 +381,16 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {
 
             mSource->start();
 
+            uint32_t flags = 0;
+
+            if (mSource->isRealTime()) {
+                flags |= Renderer::FLAG_REAL_TIME;
+            }
+
             mRenderer = new Renderer(
                     mAudioSink,
-                    new AMessage(kWhatRendererNotify, id()));
+                    new AMessage(kWhatRendererNotify, id()),
+                    flags);
 
             looper()->registerHandler(mRenderer);
 
index 3c63e80..723af09 100644 (file)
@@ -378,6 +378,7 @@ status_t NuPlayerDriver::invoke(const Parcel &request, Parcel *reply) {
             int mode = request.readInt32();
             return mPlayer->setVideoScalingMode(mode);
         }
+
         default:
         {
             return INVALID_OPERATION;
index 1ba76a5..404b56f 100644 (file)
@@ -31,9 +31,11 @@ const int64_t NuPlayer::Renderer::kMinPositionUpdateDelayUs = 100000ll;
 
 NuPlayer::Renderer::Renderer(
         const sp<MediaPlayerBase::AudioSink> &sink,
-        const sp<AMessage> &notify)
+        const sp<AMessage> &notify,
+        uint32_t flags)
     : mAudioSink(sink),
       mNotify(notify),
+      mFlags(flags),
       mNumFramesWritten(0),
       mDrainAudioQueuePending(false),
       mDrainVideoQueuePending(false),
@@ -323,6 +325,11 @@ void NuPlayer::Renderer::postDrainVideoQueue() {
     if (entry.mBuffer == NULL) {
         // EOS doesn't carry a timestamp.
         delayUs = 0;
+    } else if (mFlags & FLAG_REAL_TIME) {
+        int64_t mediaTimeUs;
+        CHECK(entry.mBuffer->meta()->findInt64("timeUs", &mediaTimeUs));
+
+        delayUs = mediaTimeUs - ALooper::GetNowUs();
     } else {
         int64_t mediaTimeUs;
         CHECK(entry.mBuffer->meta()->findInt64("timeUs", &mediaTimeUs));
@@ -368,12 +375,17 @@ void NuPlayer::Renderer::onDrainVideoQueue() {
         return;
     }
 
-    int64_t mediaTimeUs;
-    CHECK(entry->mBuffer->meta()->findInt64("timeUs", &mediaTimeUs));
+    int64_t realTimeUs;
+    if (mFlags & FLAG_REAL_TIME) {
+        CHECK(entry->mBuffer->meta()->findInt64("timeUs", &realTimeUs));
+    } else {
+        int64_t mediaTimeUs;
+        CHECK(entry->mBuffer->meta()->findInt64("timeUs", &mediaTimeUs));
+
+        realTimeUs = mediaTimeUs - mAnchorTimeMediaUs + mAnchorTimeRealUs;
+    }
 
-    int64_t realTimeUs = mediaTimeUs - mAnchorTimeMediaUs + mAnchorTimeRealUs;
     mVideoLateByUs = ALooper::GetNowUs() - realTimeUs;
-
     bool tooLate = (mVideoLateByUs > 40000);
 
     if (tooLate) {
index e4368c7..c9796e2 100644 (file)
@@ -25,8 +25,12 @@ namespace android {
 struct ABuffer;
 
 struct NuPlayer::Renderer : public AHandler {
+    enum Flags {
+        FLAG_REAL_TIME = 1,
+    };
     Renderer(const sp<MediaPlayerBase::AudioSink> &sink,
-             const sp<AMessage> &notify);
+             const sp<AMessage> &notify,
+             uint32_t flags = 0);
 
     void queueBuffer(
             bool audio,
@@ -79,6 +83,7 @@ private:
 
     sp<MediaPlayerBase::AudioSink> mAudioSink;
     sp<AMessage> mNotify;
+    uint32_t mFlags;
     List<QueueEntry> mAudioQueue;
     List<QueueEntry> mVideoQueue;
     uint32_t mNumFramesWritten;
index 8622abe..1cbf575 100644 (file)
@@ -74,6 +74,10 @@ struct NuPlayer::Source : public AHandler {
         return INVALID_OPERATION;
     }
 
+    virtual bool isRealTime() const {
+        return false;
+    }
+
 protected:
     virtual ~Source() {}
 
index df03f86..28f0d50 100644 (file)
@@ -182,5 +182,9 @@ status_t NuPlayer::StreamingSource::dequeueAccessUnit(
     return err;
 }
 
+bool NuPlayer::StreamingSource::isRealTime() const {
+    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
+}
+
 }  // namespace android
 
index 80b061c..412b6c4 100644 (file)
@@ -38,6 +38,8 @@ struct NuPlayer::StreamingSource : public NuPlayer::Source {
 
     virtual status_t dequeueAccessUnit(bool audio, sp<ABuffer> *accessUnit);
 
+    virtual bool isRealTime() const;
+
 protected:
     virtual ~StreamingSource();
 
index a167b5a..c12572f 100644 (file)
@@ -452,6 +452,10 @@ int64_t ATSParser::Program::convertPTSToTimestamp(uint64_t PTS) {
         timeUs += mParser->mAbsoluteTimeAnchorUs;
     }
 
+    if (mParser->mTimeOffsetValid) {
+        timeUs += mParser->mTimeOffsetUs;
+    }
+
     return timeUs;
 }
 
@@ -930,6 +934,8 @@ sp<MediaSource> ATSParser::Stream::getSource(SourceType type) {
 ATSParser::ATSParser(uint32_t flags)
     : mFlags(flags),
       mAbsoluteTimeAnchorUs(-1ll),
+      mTimeOffsetValid(false),
+      mTimeOffsetUs(0ll),
       mNumTSPacketsParsed(0),
       mNumPCRs(0) {
     mPSISections.add(0 /* PID */, new PSISection);
@@ -960,6 +966,13 @@ void ATSParser::signalDiscontinuity(
         CHECK(mPrograms.empty());
         mAbsoluteTimeAnchorUs = timeUs;
         return;
+    } else if (type == DISCONTINUITY_TIME_OFFSET) {
+        int64_t offset;
+        CHECK(extra->findInt64("offset", &offset));
+
+        mTimeOffsetValid = true;
+        mTimeOffsetUs = offset;
+        return;
     }
 
     for (size_t i = 0; i < mPrograms.size(); ++i) {
index 46edc45..a10edc9 100644 (file)
@@ -39,6 +39,7 @@ struct ATSParser : public RefBase {
         DISCONTINUITY_AUDIO_FORMAT      = 2,
         DISCONTINUITY_VIDEO_FORMAT      = 4,
         DISCONTINUITY_ABSOLUTE_TIME     = 8,
+        DISCONTINUITY_TIME_OFFSET       = 16,
 
         DISCONTINUITY_SEEK              = DISCONTINUITY_TIME,
 
@@ -106,6 +107,9 @@ private:
 
     int64_t mAbsoluteTimeAnchorUs;
 
+    bool mTimeOffsetValid;
+    int64_t mTimeOffsetUs;
+
     size_t mNumTSPacketsParsed;
 
     void parseProgramAssociationTable(ABitReader *br);
index cb6011c..465f4c4 100644 (file)
@@ -27,6 +27,7 @@
 #include <net/if.h>
 #include <netdb.h>
 #include <netinet/in.h>
+#include <sys/ioctl.h>
 #include <sys/socket.h>
 
 #include <media/stagefright/foundation/ABuffer.h>
@@ -507,6 +508,14 @@ status_t ANetworkSession::Session::writeMore() {
         mSawSendFailure = true;
     }
 
+#if 0
+    int numBytesQueued;
+    int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
+    if (res == 0 && numBytesQueued > 102400) {
+        ALOGI("numBytesQueued = %d", numBytesQueued);
+    }
+#endif
+
     return err;
 }
 
index 19f560c..f81929c 100644 (file)
@@ -15,6 +15,7 @@ LOCAL_SRC_FILES:= \
         sink/TunnelRenderer.cpp         \
         sink/WifiDisplaySink.cpp        \
         SNTPClient.cpp                  \
+        TimeSyncer.cpp                  \
         source/Converter.cpp            \
         source/MediaPuller.cpp          \
         source/PlaybackSession.cpp      \
index 3c92d41..10a2dff 100644 (file)
@@ -127,7 +127,10 @@ void MediaReceiver::onMessageReceived(const sp<AMessage> &msg) {
                 notifyInitDone(mInitStatus);
             }
 
-            mTSParser = new ATSParser(ATSParser::ALIGNED_VIDEO_DATA);
+            mTSParser = new ATSParser(
+                    ATSParser::ALIGNED_VIDEO_DATA
+                        | ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
+
             mFormatKnownMask = 0;
             break;
         }
@@ -306,6 +309,15 @@ void MediaReceiver::postAccessUnit(
     notify->post();
 }
 
+status_t MediaReceiver::notifyLateness(size_t trackIndex, int64_t latenessUs) {
+    if (trackIndex >= mTrackInfos.size()) {
+        return -ERANGE;
+    }
+
+    TrackInfo *info = &mTrackInfos.editItemAt(trackIndex);
+    return info->mReceiver->notifyLateness(latenessUs);
+}
+
 }  // namespace android
 
 
index 7adc3c4..cdfde99 100644 (file)
@@ -60,6 +60,8 @@ struct MediaReceiver : public AHandler {
     };
     status_t initAsync(Mode mode);
 
+    status_t notifyLateness(size_t trackIndex, int64_t latenessUs);
+
 protected:
     virtual void onMessageReceived(const sp<AMessage> &msg);
     virtual ~MediaReceiver();
diff --git a/media/libstagefright/wifi-display/TimeSyncer.cpp b/media/libstagefright/wifi-display/TimeSyncer.cpp
new file mode 100644 (file)
index 0000000..64e182e
--- /dev/null
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2013, The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NEBUG 0
+#define LOG_TAG "TimeSyncer"
+#include <utils/Log.h>
+
+#include "TimeSyncer.h"
+
+#include "ANetworkSession.h"
+
+#include <media/stagefright/foundation/ABuffer.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AHandler.h>
+#include <media/stagefright/foundation/ALooper.h>
+#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/Utils.h>
+
+namespace android {
+
+TimeSyncer::TimeSyncer(
+        const sp<ANetworkSession> &netSession, const sp<AMessage> &notify)
+    : mNetSession(netSession),
+      mNotify(notify),
+      mIsServer(false),
+      mConnected(false),
+      mUDPSession(0),
+      mSeqNo(0),
+      mTotalTimeUs(0.0),
+      mPendingT1(0ll),
+      mTimeoutGeneration(0) {
+}
+
+TimeSyncer::~TimeSyncer() {
+}
+
+void TimeSyncer::startServer(unsigned localPort) {
+    sp<AMessage> msg = new AMessage(kWhatStartServer, id());
+    msg->setInt32("localPort", localPort);
+    msg->post();
+}
+
+void TimeSyncer::startClient(const char *remoteHost, unsigned remotePort) {
+    sp<AMessage> msg = new AMessage(kWhatStartClient, id());
+    msg->setString("remoteHost", remoteHost);
+    msg->setInt32("remotePort", remotePort);
+    msg->post();
+}
+
+void TimeSyncer::onMessageReceived(const sp<AMessage> &msg) {
+    switch (msg->what()) {
+        case kWhatStartClient:
+        {
+            AString remoteHost;
+            CHECK(msg->findString("remoteHost", &remoteHost));
+
+            int32_t remotePort;
+            CHECK(msg->findInt32("remotePort", &remotePort));
+
+            sp<AMessage> notify = new AMessage(kWhatUDPNotify, id());
+
+            CHECK_EQ((status_t)OK,
+                     mNetSession->createUDPSession(
+                         0 /* localPort */,
+                         remoteHost.c_str(),
+                         remotePort,
+                         notify,
+                         &mUDPSession));
+
+            postSendPacket();
+            break;
+        }
+
+        case kWhatStartServer:
+        {
+            mIsServer = true;
+
+            int32_t localPort;
+            CHECK(msg->findInt32("localPort", &localPort));
+
+            sp<AMessage> notify = new AMessage(kWhatUDPNotify, id());
+
+            CHECK_EQ((status_t)OK,
+                     mNetSession->createUDPSession(
+                         localPort, notify, &mUDPSession));
+
+            break;
+        }
+
+        case kWhatSendPacket:
+        {
+            TimeInfo ti;
+            memset(&ti, 0, sizeof(ti));
+
+            ti.mT1 = ALooper::GetNowUs();
+
+            CHECK_EQ((status_t)OK,
+                     mNetSession->sendRequest(
+                         mUDPSession, &ti, sizeof(ti)));
+
+            mPendingT1 = ti.mT1;
+            postTimeout();
+            break;
+        }
+
+        case kWhatTimedOut:
+        {
+            int32_t generation;
+            CHECK(msg->findInt32("generation", &generation));
+
+            if (generation != mTimeoutGeneration) {
+                break;
+            }
+
+            ALOGI("timed out, sending another request");
+            postSendPacket();
+            break;
+        }
+
+        case kWhatUDPNotify:
+        {
+            int32_t reason;
+            CHECK(msg->findInt32("reason", &reason));
+
+            switch (reason) {
+                case ANetworkSession::kWhatError:
+                {
+                    int32_t sessionID;
+                    CHECK(msg->findInt32("sessionID", &sessionID));
+
+                    int32_t err;
+                    CHECK(msg->findInt32("err", &err));
+
+                    AString detail;
+                    CHECK(msg->findString("detail", &detail));
+
+                    ALOGE("An error occurred in session %d (%d, '%s/%s').",
+                          sessionID,
+                          err,
+                          detail.c_str(),
+                          strerror(-err));
+
+                    mNetSession->destroySession(sessionID);
+
+                    cancelTimeout();
+
+                    notifyError(err);
+                    break;
+                }
+
+                case ANetworkSession::kWhatDatagram:
+                {
+                    int32_t sessionID;
+                    CHECK(msg->findInt32("sessionID", &sessionID));
+
+                    sp<ABuffer> packet;
+                    CHECK(msg->findBuffer("data", &packet));
+
+                    int64_t arrivalTimeUs;
+                    CHECK(packet->meta()->findInt64(
+                                "arrivalTimeUs", &arrivalTimeUs));
+
+                    CHECK_EQ(packet->size(), sizeof(TimeInfo));
+
+                    TimeInfo *ti = (TimeInfo *)packet->data();
+
+                    if (mIsServer) {
+                        if (!mConnected) {
+                            AString fromAddr;
+                            CHECK(msg->findString("fromAddr", &fromAddr));
+
+                            int32_t fromPort;
+                            CHECK(msg->findInt32("fromPort", &fromPort));
+
+                            CHECK_EQ((status_t)OK,
+                                     mNetSession->connectUDPSession(
+                                         mUDPSession, fromAddr.c_str(), fromPort));
+
+                            mConnected = true;
+                        }
+
+                        ti->mT2 = arrivalTimeUs;
+                        ti->mT3 = ALooper::GetNowUs();
+
+                        CHECK_EQ((status_t)OK,
+                                 mNetSession->sendRequest(
+                                     mUDPSession, ti, sizeof(*ti)));
+                    } else {
+                        if (ti->mT1 != mPendingT1) {
+                            break;
+                        }
+
+                        cancelTimeout();
+                        mPendingT1 = 0;
+
+                        ti->mT4 = arrivalTimeUs;
+
+                        // One way delay for a packet to travel from client
+                        // to server or back (assumed to be the same either way).
+                        int64_t delay =
+                            (ti->mT2 - ti->mT1 + ti->mT4 - ti->mT3) / 2;
+
+                        // Offset between the client clock (T1, T4) and the
+                        // server clock (T2, T3) timestamps.
+                        int64_t offset =
+                            (ti->mT2 - ti->mT1 - ti->mT4 + ti->mT3) / 2;
+
+                        mHistory.push_back(*ti);
+
+                        ALOGV("delay = %lld us,\toffset %lld us",
+                               delay,
+                               offset);
+
+                        if (mHistory.size() < kNumPacketsPerBatch) {
+                            postSendPacket(1000000ll / 30);
+                        } else {
+                            notifyOffset();
+
+                            mHistory.clear();
+                            postSendPacket(kBatchDelayUs);
+                        }
+                    }
+                    break;
+                }
+
+                default:
+                    TRESPASS();
+            }
+
+            break;
+        }
+
+        default:
+            TRESPASS();
+    }
+}
+
+void TimeSyncer::postSendPacket(int64_t delayUs) {
+    (new AMessage(kWhatSendPacket, id()))->post(delayUs);
+}
+
+void TimeSyncer::postTimeout() {
+    sp<AMessage> msg = new AMessage(kWhatTimedOut, id());
+    msg->setInt32("generation", mTimeoutGeneration);
+    msg->post(kTimeoutDelayUs);
+}
+
+void TimeSyncer::cancelTimeout() {
+    ++mTimeoutGeneration;
+}
+
+void TimeSyncer::notifyError(status_t err) {
+    if (mNotify == NULL) {
+        looper()->stop();
+        return;
+    }
+
+    sp<AMessage> notify = mNotify->dup();
+    notify->setInt32("what", kWhatError);
+    notify->setInt32("err", err);
+    notify->post();
+}
+
+// static
+int TimeSyncer::CompareRountripTime(const TimeInfo *ti1, const TimeInfo *ti2) {
+    int64_t rt1 = ti1->mT4 - ti1->mT1;
+    int64_t rt2 = ti2->mT4 - ti2->mT1;
+
+    if (rt1 < rt2) {
+        return -1;
+    } else if (rt1 > rt2) {
+        return 1;
+    }
+
+    return 0;
+}
+
+void TimeSyncer::notifyOffset() {
+    mHistory.sort(CompareRountripTime);
+
+    int64_t sum = 0ll;
+    size_t count = 0;
+
+    // Only consider the third of the information associated with the best
+    // (smallest) roundtrip times.
+    for (size_t i = 0; i < mHistory.size() / 3; ++i) {
+        const TimeInfo *ti = &mHistory[i];
+
+#if 0
+        // One way delay for a packet to travel from client
+        // to server or back (assumed to be the same either way).
+        int64_t delay =
+            (ti->mT2 - ti->mT1 + ti->mT4 - ti->mT3) / 2;
+#endif
+
+        // Offset between the client clock (T1, T4) and the
+        // server clock (T2, T3) timestamps.
+        int64_t offset =
+            (ti->mT2 - ti->mT1 - ti->mT4 + ti->mT3) / 2;
+
+        ALOGV("(%d) RT: %lld us, offset: %lld us",
+              i, ti->mT4 - ti->mT1, offset);
+
+        sum += offset;
+        ++count;
+    }
+
+    if (mNotify == NULL) {
+        ALOGI("avg. offset is %lld", sum / count);
+        return;
+    }
+
+    sp<AMessage> notify = mNotify->dup();
+    notify->setInt32("what", kWhatTimeOffset);
+    notify->setInt64("offset", sum / count);
+    notify->post();
+}
+
+}  // namespace android
diff --git a/media/libstagefright/wifi-display/TimeSyncer.h b/media/libstagefright/wifi-display/TimeSyncer.h
new file mode 100644 (file)
index 0000000..0e3aed7
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2013, The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TIME_SYNCER_H_
+
+#define TIME_SYNCER_H_
+
+#include <media/stagefright/foundation/AHandler.h>
+
+namespace android {
+
+struct ANetworkSession;
+
+/*
+   TimeSyncer allows us to synchronize time between a client and a server.
+   The client sends a UDP packet containing its send-time to the server,
+   the server sends that packet back to the client amended with information
+   about when it was received as well as the time the reply was sent back.
+   Finally the client receives the reply and has now enough information to
+   compute the clock offset between client and server assuming that packet
+   exchange is symmetric, i.e. time for a packet client->server and
+   server->client is roughly equal.
+   This exchange is repeated a number of times and the average offset computed
+   over the 30% of packets that had the lowest roundtrip times.
+   The offset is determined every 10 secs to account for slight differences in
+   clock frequency.
+*/
+struct TimeSyncer : public AHandler {
+    enum {
+        kWhatError,
+        kWhatTimeOffset,
+    };
+    TimeSyncer(
+            const sp<ANetworkSession> &netSession,
+            const sp<AMessage> &notify);
+
+    void startServer(unsigned localPort);
+    void startClient(const char *remoteHost, unsigned remotePort);
+
+protected:
+    virtual ~TimeSyncer();
+
+    virtual void onMessageReceived(const sp<AMessage> &msg);
+
+private:
+    enum {
+        kWhatStartServer,
+        kWhatStartClient,
+        kWhatUDPNotify,
+        kWhatSendPacket,
+        kWhatTimedOut,
+    };
+
+    struct TimeInfo {
+        int64_t mT1;  // client timestamp at send
+        int64_t mT2;  // server timestamp at receive
+        int64_t mT3;  // server timestamp at send
+        int64_t mT4;  // client timestamp at receive
+    };
+
+    enum {
+        kNumPacketsPerBatch = 30,
+    };
+    static const int64_t kTimeoutDelayUs = 500000ll;
+    static const int64_t kBatchDelayUs = 10000000ll;  // every 10 secs
+
+    sp<ANetworkSession> mNetSession;
+    sp<AMessage> mNotify;
+
+    bool mIsServer;
+    bool mConnected;
+    int32_t mUDPSession;
+    uint32_t mSeqNo;
+    double mTotalTimeUs;
+
+    Vector<TimeInfo> mHistory;
+
+    int64_t mPendingT1;
+    int32_t mTimeoutGeneration;
+
+    void postSendPacket(int64_t delayUs = 0ll);
+
+    void postTimeout();
+    void cancelTimeout();
+
+    void notifyError(status_t err);
+    void notifyOffset();
+
+    static int CompareRountripTime(const TimeInfo *ti1, const TimeInfo *ti2);
+
+    DISALLOW_EVIL_CONSTRUCTORS(TimeSyncer);
+};
+
+}  // namespace android
+
+#endif  // TIME_SYNCER_H_
index d0ab60d..5f189e7 100644 (file)
@@ -53,6 +53,11 @@ void RTPReceiver::TSAssembler::signalDiscontinuity() {
 }
 
 status_t RTPReceiver::TSAssembler::processPacket(const sp<ABuffer> &packet) {
+    int32_t rtpTime;
+    CHECK(packet->meta()->findInt32("rtp-time", &rtpTime));
+
+    packet->meta()->setInt64("timeUs", (rtpTime * 100ll) / 9);
+
     postAccessUnit(packet, mSawDiscontinuity);
 
     if (mSawDiscontinuity) {
index 29482af..8711b08 100644 (file)
@@ -221,10 +221,12 @@ void RTPReceiver::Source::dequeueMore() {
 
             mNumDeclaredLostPrior = mNumDeclaredLost;
 
-            ALOGI("lost %lld packets (%.2f %%), declared %d lost\n",
-                  lostInterval,
-                  100.0f * lostInterval / expectedInterval,
-                  declaredLostInterval);
+            if (declaredLostInterval > 0) {
+                ALOGI("lost %lld packets (%.2f %%), declared %d lost\n",
+                      lostInterval,
+                      100.0f * lostInterval / expectedInterval,
+                      declaredLostInterval);
+            }
         }
 
         mNextReportTimeUs = nowUs + kReportIntervalUs;
@@ -530,6 +532,40 @@ status_t RTPReceiver::connect(
     return OK;
 }
 
+status_t RTPReceiver::notifyLateness(int64_t latenessUs) {
+    sp<ABuffer> buf = new ABuffer(20);
+
+    uint8_t *ptr = buf->data();
+    ptr[0] = 0x80 | 0;
+    ptr[1] = 204;  // APP
+    ptr[2] = 0;
+
+    CHECK((buf->size() % 4) == 0u);
+    ptr[3] = (buf->size() / 4) - 1;
+
+    ptr[4] = kSourceID >> 24;  // SSRC
+    ptr[5] = (kSourceID >> 16) & 0xff;
+    ptr[6] = (kSourceID >> 8) & 0xff;
+    ptr[7] = kSourceID & 0xff;
+    ptr[8] = 'l';
+    ptr[9] = 'a';
+    ptr[10] = 't';
+    ptr[11] = 'e';
+
+    ptr[12] = latenessUs >> 56;
+    ptr[13] = (latenessUs >> 48) & 0xff;
+    ptr[14] = (latenessUs >> 40) & 0xff;
+    ptr[15] = (latenessUs >> 32) & 0xff;
+    ptr[16] = (latenessUs >> 24) & 0xff;
+    ptr[17] = (latenessUs >> 16) & 0xff;
+    ptr[18] = (latenessUs >> 8) & 0xff;
+    ptr[19] = latenessUs & 0xff;
+
+    mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
+
+    return OK;
+}
+
 void RTPReceiver::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         case kWhatRTPNotify:
index 2ae864a..ec4671d 100644 (file)
@@ -53,6 +53,8 @@ struct RTPReceiver : public RTPBase, public AHandler {
             int32_t remoteRTPPort,
             int32_t remoteRTCPPort);
 
+    status_t notifyLateness(int64_t latenessUs);
+
 protected:
     virtual ~RTPReceiver();
     virtual void onMessageReceived(const sp<AMessage> &msg);
index 85c5933..b60853d 100644 (file)
@@ -577,6 +577,8 @@ status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
 
             case 202:  // SDES
             case 203:
+                break;
+
             case 204:  // APP
                 break;
 
index b53252d..5efcd17 100644 (file)
@@ -39,8 +39,11 @@ DirectRenderer::DirectRenderer(
     : mSurfaceTex(bufferProducer),
       mVideoDecoderNotificationPending(false),
       mRenderPending(false),
-      mFirstRenderTimeUs(-1ll),
-      mFirstRenderRealUs(-1ll) {
+      mTimeOffsetUs(0ll),
+      mLatencySum(0ll),
+      mLatencyCount(0),
+      mNumFramesLate(0),
+      mNumFrames(0) {
 }
 
 DirectRenderer::~DirectRenderer() {
@@ -53,6 +56,29 @@ DirectRenderer::~DirectRenderer() {
     }
 }
 
+void DirectRenderer::setTimeOffset(int64_t offset) {
+    mTimeOffsetUs = offset;
+}
+
+int64_t DirectRenderer::getAvgLatenessUs() {
+    if (mLatencyCount == 0) {
+        return 0ll;
+    }
+
+    int64_t avgLatencyUs = mLatencySum / mLatencyCount;
+
+    mLatencySum = 0ll;
+    mLatencyCount = 0;
+
+    if (mNumFrames > 0) {
+        ALOGI("%d / %d frames late", mNumFramesLate, mNumFrames);
+        mNumFramesLate = 0;
+        mNumFrames = 0;
+    }
+
+    return avgLatencyUs;
+}
+
 void DirectRenderer::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         case kWhatVideoDecoderNotify:
@@ -224,14 +250,17 @@ void DirectRenderer::onVideoDecoderNotify() {
 }
 
 void DirectRenderer::queueOutputBuffer(size_t index, int64_t timeUs) {
-#if 0
+#if 1
     OutputInfo info;
     info.mIndex = index;
-    info.mTimeUs = timeUs;
+    info.mTimeUs = timeUs + mTimeOffsetUs;
     mOutputBuffers.push_back(info);
 
     scheduleRenderIfNecessary();
 #else
+    mLatencySum += ALooper::GetNowUs() - (timeUs + mTimeOffsetUs);
+    ++mLatencyCount;
+
     status_t err = mVideoDecoder->renderOutputBufferAndRelease(index);
     CHECK_EQ(err, (status_t)OK);
 #endif
@@ -247,13 +276,7 @@ void DirectRenderer::scheduleRenderIfNecessary() {
     int64_t timeUs = (*mOutputBuffers.begin()).mTimeUs;
     int64_t nowUs = ALooper::GetNowUs();
 
-    if (mFirstRenderTimeUs < 0ll) {
-        mFirstRenderTimeUs = timeUs;
-        mFirstRenderRealUs = nowUs;
-    }
-
-    int64_t whenUs = timeUs - mFirstRenderTimeUs + mFirstRenderRealUs;
-    int64_t delayUs = whenUs - nowUs;
+    int64_t delayUs = timeUs - nowUs;
 
     (new AMessage(kWhatRender, id()))->post(delayUs);
 }
@@ -270,6 +293,14 @@ void DirectRenderer::onRender() {
             break;
         }
 
+        if (info.mTimeUs + 15000ll < nowUs) {
+            ++mNumFramesLate;
+        }
+        ++mNumFrames;
+
+        mLatencySum += nowUs - info.mTimeUs;
+        ++mLatencyCount;
+
         status_t err = mVideoDecoder->renderOutputBufferAndRelease(info.mIndex);
         CHECK_EQ(err, (status_t)OK);
 
index 7219080..44be8f8 100644 (file)
@@ -36,6 +36,10 @@ struct DirectRenderer : public AHandler {
     void setFormat(size_t trackIndex, const sp<AMessage> &format);
     void queueAccessUnit(size_t trackIndex, const sp<ABuffer> &accessUnit);
 
+    void setTimeOffset(int64_t offset);
+
+    int64_t getAvgLatenessUs();
+
 protected:
     virtual void onMessageReceived(const sp<AMessage> &msg);
     virtual ~DirectRenderer();
@@ -63,8 +67,14 @@ private:
 
     List<OutputInfo> mOutputBuffers;
     bool mRenderPending;
-    int64_t mFirstRenderTimeUs;
-    int64_t mFirstRenderRealUs;
+
+    int64_t mTimeOffsetUs;
+
+    int64_t mLatencySum;
+    size_t mLatencyCount;
+
+    int32_t mNumFramesLate;
+    int32_t mNumFrames;
 
     void onVideoDecoderNotify();
     void onRender();
index d9d8a76..6b185db 100644 (file)
@@ -27,6 +27,7 @@
 #include <gui/SurfaceComposerClient.h>
 #include <media/IMediaPlayerService.h>
 #include <media/IStreamSource.h>
+#include <media/mediaplayer.h>
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/foundation/AMessage.h>
@@ -60,6 +61,8 @@ struct TunnelRenderer::StreamSource : public BnStreamSource {
 
     void doSomeWork();
 
+    void setTimeOffset(int64_t offset);
+
 protected:
     virtual ~StreamSource();
 
@@ -75,6 +78,9 @@ private:
 
     size_t mNumDeqeued;
 
+    int64_t mTimeOffsetUs;
+    bool mTimeOffsetChanged;
+
     DISALLOW_EVIL_CONSTRUCTORS(StreamSource);
 };
 
@@ -82,7 +88,9 @@ private:
 
 TunnelRenderer::StreamSource::StreamSource(TunnelRenderer *owner)
     : mOwner(owner),
-      mNumDeqeued(0) {
+      mNumDeqeued(0),
+      mTimeOffsetUs(0ll),
+      mTimeOffsetChanged(false) {
 }
 
 TunnelRenderer::StreamSource::~StreamSource() {
@@ -110,7 +118,7 @@ void TunnelRenderer::StreamSource::onBufferAvailable(size_t index) {
 }
 
 uint32_t TunnelRenderer::StreamSource::flags() const {
-    return kFlagAlignedVideoData;
+    return kFlagAlignedVideoData | kFlagIsRealTimeData;
 }
 
 void TunnelRenderer::StreamSource::doSomeWork() {
@@ -124,21 +132,21 @@ void TunnelRenderer::StreamSource::doSomeWork() {
 
         ++mNumDeqeued;
 
-        if (mNumDeqeued == 1) {
-            ALOGI("fixing real time now.");
-
+        if (mTimeOffsetChanged) {
             sp<AMessage> extra = new AMessage;
 
             extra->setInt32(
                     IStreamListener::kKeyDiscontinuityMask,
-                    ATSParser::DISCONTINUITY_ABSOLUTE_TIME);
+                    ATSParser::DISCONTINUITY_TIME_OFFSET);
 
-            extra->setInt64("timeUs", ALooper::GetNowUs());
+            extra->setInt64("offset", mTimeOffsetUs);
 
             mListener->issueCommand(
                     IStreamListener::DISCONTINUITY,
                     false /* synchronous */,
                     extra);
+
+            mTimeOffsetChanged = false;
         }
 
         ALOGV("dequeue TS packet of size %d", srcBuffer->size());
@@ -155,18 +163,32 @@ void TunnelRenderer::StreamSource::doSomeWork() {
     }
 }
 
+void TunnelRenderer::StreamSource::setTimeOffset(int64_t offset) {
+    Mutex::Autolock autoLock(mLock);
+
+    if (offset != mTimeOffsetUs) {
+        mTimeOffsetUs = offset;
+        mTimeOffsetChanged = true;
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 
 TunnelRenderer::TunnelRenderer(
         const sp<IGraphicBufferProducer> &bufferProducer)
     : mSurfaceTex(bufferProducer),
       mStartup(true) {
+    mStreamSource = new StreamSource(this);
 }
 
 TunnelRenderer::~TunnelRenderer() {
     destroyPlayer();
 }
 
+void TunnelRenderer::setTimeOffset(int64_t offset) {
+    mStreamSource->setTimeOffset(offset);
+}
+
 void TunnelRenderer::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         default:
@@ -209,8 +231,6 @@ void TunnelRenderer::initPlayer() {
     sp<IMediaPlayerService> service = interface_cast<IMediaPlayerService>(binder);
     CHECK(service.get() != NULL);
 
-    mStreamSource = new StreamSource(this);
-
     mPlayerClient = new PlayerClient;
 
     mPlayer = service->create(mPlayerClient, 0);
@@ -226,6 +246,8 @@ void TunnelRenderer::initPlayer() {
 void TunnelRenderer::destroyPlayer() {
     mStreamSource.clear();
 
+    mPlayer->setVideoSurfaceTexture(NULL);
+
     mPlayer->stop();
     mPlayer.clear();
 
index 8e96665..479e73c 100644 (file)
@@ -39,6 +39,12 @@ struct TunnelRenderer : public AHandler {
     void queueBuffer(const sp<ABuffer> &buffer);
     sp<ABuffer> dequeueBuffer();
 
+    void setTimeOffset(int64_t offset);
+
+    int64_t getAvgLatenessUs() {
+        return 0ll;
+    }
+
 protected:
     virtual void onMessageReceived(const sp<AMessage> &msg);
     virtual ~TunnelRenderer();
index 158c2da..0d2e347 100644 (file)
 #include "DirectRenderer.h"
 #include "MediaReceiver.h"
 #include "ParsedMessage.h"
+#include "TimeSyncer.h"
 #include "TunnelRenderer.h"
 
+#include <cutils/properties.h>
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/foundation/AMessage.h>
 #include <media/stagefright/MediaErrors.h>
 
-#include <cutils/properties.h>
-
 namespace android {
 
 WifiDisplaySink::WifiDisplaySink(
+        uint32_t flags,
         const sp<ANetworkSession> &netSession,
         const sp<IGraphicBufferProducer> &bufferProducer,
         const sp<AMessage> &notify)
     : mState(UNDEFINED),
+      mFlags(flags),
       mNetSession(netSession),
       mSurfaceTex(bufferProducer),
       mNotify(notify),
@@ -46,7 +48,11 @@ WifiDisplaySink::WifiDisplaySink(
       mUsingTCPInterleaving(false),
       mSessionID(0),
       mNextCSeq(1),
-      mIDRFrameRequestPending(false) {
+      mIDRFrameRequestPending(false),
+      mTimeOffsetUs(0ll),
+      mTimeOffsetValid(false),
+      mTargetLatencyUs(-1ll),
+      mSetupDeferred(false) {
     // We support any and all resolutions, but prefer 720p30
     mSinkSupportedVideoFormats.setNativeResolution(
             VideoFormats::RESOLUTION_CEA, 5);  // 1280 x 720 p30
@@ -199,6 +205,16 @@ void WifiDisplaySink::onMessageReceived(const sp<AMessage> &msg) {
                 {
                     ALOGI("We're now connected.");
                     mState = CONNECTED;
+
+                    if (mFlags & FLAG_SPECIAL_MODE) {
+                        sp<AMessage> notify = new AMessage(
+                                kWhatTimeSyncerNotify, id());
+
+                        mTimeSyncer = new TimeSyncer(mNetSession, notify);
+                        looper()->registerHandler(mTimeSyncer);
+
+                        mTimeSyncer->startClient(mRTSPHost.c_str(), 8123);
+                    }
                     break;
                 }
 
@@ -226,6 +242,41 @@ void WifiDisplaySink::onMessageReceived(const sp<AMessage> &msg) {
             break;
         }
 
+        case kWhatTimeSyncerNotify:
+        {
+            int32_t what;
+            CHECK(msg->findInt32("what", &what));
+
+            if (what == TimeSyncer::kWhatTimeOffset) {
+                CHECK(msg->findInt64("offset", &mTimeOffsetUs));
+                mTimeOffsetValid = true;
+
+                if (mSetupDeferred) {
+                    CHECK_EQ((status_t)OK,
+                             sendSetup(
+                                mSessionID,
+                                "rtsp://x.x.x.x:x/wfd1.0/streamid=0"));
+
+                    mSetupDeferred = false;
+                }
+            }
+            break;
+        }
+
+        case kWhatReportLateness:
+        {
+            int64_t latenessUs = mRenderer->getAvgLatenessUs();
+
+            ALOGI("avg. lateness = %lld ms",
+                  (latenessUs + mTargetLatencyUs) / 1000ll);
+
+            mMediaReceiver->notifyLateness(
+                    0 /* trackIndex */, latenessUs);
+
+            msg->post(kReportLatenessEveryUs);
+            break;
+        }
+
         default:
             TRESPASS();
     }
@@ -266,15 +317,39 @@ void WifiDisplaySink::onMediaReceiverNotify(const sp<AMessage> &msg) {
                 looper()->registerHandler(mRenderer);
             }
 
+            CHECK(mTimeOffsetValid);
+
+            int64_t latencyUs = 300000ll;  // 300ms by default
+
+            char val[PROPERTY_VALUE_MAX];
+            if (property_get("media.wfd-sink.latency", val, NULL)) {
+                char *end;
+                int64_t x = strtoll(val, &end, 10);
+
+                if (end > val && *end == '\0' && x >= 0ll) {
+                    latencyUs = x;
+                }
+            }
+
+            if (latencyUs != mTargetLatencyUs) {
+                mTargetLatencyUs = latencyUs;
+
+                ALOGI("Assuming %lld ms of latency.", latencyUs / 1000ll);
+            }
+
+            // We are the timesync _client_,
+            // client time = server time - time offset.
+            mRenderer->setTimeOffset(-mTimeOffsetUs + mTargetLatencyUs);
+
             sp<ABuffer> accessUnit;
             CHECK(msg->findBuffer("accessUnit", &accessUnit));
 
-#if USE_TUNNEL_RENDERER
-            mRenderer->queueBuffer(accessUnit);
-#else
             size_t trackIndex;
             CHECK(msg->findSize("trackIndex", &trackIndex));
 
+#if USE_TUNNEL_RENDERER
+            mRenderer->queueBuffer(accessUnit);
+#else
             sp<AMessage> format;
             if (msg->findMessage("format", &format)) {
                 mRenderer->setFormat(trackIndex, format);
@@ -445,6 +520,8 @@ status_t WifiDisplaySink::onReceivePlayResponse(
 
     mState = PLAYING;
 
+    (new AMessage(kWhatReportLateness, id()))->post(kReportLatenessEveryUs);
+
     return OK;
 }
 
@@ -555,6 +632,8 @@ void WifiDisplaySink::onGetParameterRequest(
                 mUsingTCPTransport = true;
                 mUsingTCPInterleaving = true;
             }
+        } else if (mFlags & FLAG_SPECIAL_MODE) {
+            mUsingTCPTransport = true;
         }
 
         body = "wfd_video_formats: ";
@@ -735,12 +814,16 @@ void WifiDisplaySink::onSetParameterRequest(
     const char *content = data->getContent();
 
     if (strstr(content, "wfd_trigger_method: SETUP\r\n") != NULL) {
-        status_t err =
-            sendSetup(
-                    sessionID,
-                    "rtsp://x.x.x.x:x/wfd1.0/streamid=0");
+        if ((mFlags & FLAG_SPECIAL_MODE) && !mTimeOffsetValid) {
+            mSetupDeferred = true;
+        } else {
+            status_t err =
+                sendSetup(
+                        sessionID,
+                        "rtsp://x.x.x.x:x/wfd1.0/streamid=0");
 
-        CHECK_EQ(err, (status_t)OK);
+            CHECK_EQ(err, (status_t)OK);
+        }
     }
 
     AString response = "RTSP/1.0 200 OK\r\n";
index 01af58b..2b8c6f7 100644 (file)
@@ -31,6 +31,7 @@ struct AMessage;
 struct DirectRenderer;
 struct MediaReceiver;
 struct ParsedMessage;
+struct TimeSyncer;
 struct TunnelRenderer;
 
 #define USE_TUNNEL_RENDERER     0
@@ -43,11 +44,16 @@ struct WifiDisplaySink : public AHandler {
         kWhatDisconnected,
     };
 
+    enum Flags {
+        FLAG_SPECIAL_MODE = 1,
+    };
+
     // If no notification message is specified (notify == NULL)
     // the sink will stop its looper() once the session ends,
     // otherwise it will post an appropriate notification but leave
     // the looper() running.
     WifiDisplaySink(
+            uint32_t flags,
             const sp<ANetworkSession> &netSession,
             const sp<IGraphicBufferProducer> &bufferProducer = NULL,
             const sp<AMessage> &notify = NULL);
@@ -73,6 +79,8 @@ private:
         kWhatRTSPNotify,
         kWhatStop,
         kWhatMediaReceiverNotify,
+        kWhatTimeSyncerNotify,
+        kWhatReportLateness,
     };
 
     struct ResponseID {
@@ -89,11 +97,15 @@ private:
     typedef status_t (WifiDisplaySink::*HandleRTSPResponseFunc)(
             int32_t sessionID, const sp<ParsedMessage> &msg);
 
+    static const int64_t kReportLatenessEveryUs = 1000000ll;
+
     State mState;
+    uint32_t mFlags;
     VideoFormats mSinkSupportedVideoFormats;
     sp<ANetworkSession> mNetSession;
     sp<IGraphicBufferProducer> mSurfaceTex;
     sp<AMessage> mNotify;
+    sp<TimeSyncer> mTimeSyncer;
     bool mUsingTCPTransport;
     bool mUsingTCPInterleaving;
     AString mRTSPHost;
@@ -117,6 +129,13 @@ private:
 
     bool mIDRFrameRequestPending;
 
+    int64_t mTimeOffsetUs;
+    bool mTimeOffsetValid;
+
+    int64_t mTargetLatencyUs;
+
+    bool mSetupDeferred;
+
     status_t sendM2(int32_t sessionID);
     status_t sendSetup(int32_t sessionID, const char *uri);
     status_t sendPlay(int32_t sessionID, const char *uri);
index b8524f6..de66bde 100644 (file)
@@ -23,6 +23,7 @@
 #include "Parameters.h"
 #include "ParsedMessage.h"
 #include "rtp/RTPSender.h"
+#include "TimeSyncer.h"
 
 #include <binder/IServiceManager.h>
 #include <gui/IGraphicBufferProducer.h>
@@ -157,6 +158,12 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {
             }
 
             if (err == OK) {
+                sp<AMessage> notify = new AMessage(kWhatTimeSyncerNotify, id());
+                mTimeSyncer = new TimeSyncer(mNetSession, notify);
+                looper()->registerHandler(mTimeSyncer);
+
+                mTimeSyncer->startServer(8123);
+
                 mState = AWAITING_CLIENT_CONNECTION;
             }
 
@@ -520,6 +527,11 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {
             break;
         }
 
+        case kWhatTimeSyncerNotify:
+        {
+            break;
+        }
+
         default:
             TRESPASS();
     }
index 724462c..9e72682 100644 (file)
@@ -30,6 +30,7 @@ namespace android {
 struct IHDCP;
 struct IRemoteDisplayClient;
 struct ParsedMessage;
+struct TimeSyncer;
 
 // Represents the RTSP server acting as a wifi display source.
 // Manages incoming connections, sets up Playback sessions as necessary.
@@ -81,6 +82,7 @@ private:
         kWhatHDCPNotify,
         kWhatFinishStop2,
         kWhatTeardownTriggerTimedOut,
+        kWhatTimeSyncerNotify,
     };
 
     struct ResponseID {
@@ -114,6 +116,7 @@ private:
     VideoFormats mSupportedSourceVideoFormats;
     sp<ANetworkSession> mNetSession;
     sp<IRemoteDisplayClient> mClient;
+    sp<TimeSyncer> mTimeSyncer;
     struct in_addr mInterfaceAddr;
     int32_t mSessionID;
 
index 86437e0..111846d 100644 (file)
 #include <utils/Log.h>
 
 #include "ANetworkSession.h"
+#include "TimeSyncer.h"
 
 #include <binder/ProcessState.h>
-#include <media/stagefright/foundation/ABuffer.h>
-#include <media/stagefright/foundation/ADebug.h>
-#include <media/stagefright/foundation/AHandler.h>
-#include <media/stagefright/foundation/ALooper.h>
 #include <media/stagefright/foundation/AMessage.h>
-#include <media/stagefright/Utils.h>
 
 namespace android {
 
-struct TestHandler : public AHandler {
-    TestHandler(const sp<ANetworkSession> &netSession);
-
-    void startServer(unsigned localPort);
-    void startClient(const char *remoteHost, unsigned remotePort);
-
-protected:
-    virtual ~TestHandler();
-
-    virtual void onMessageReceived(const sp<AMessage> &msg);
-
-private:
-    enum {
-        kWhatStartServer,
-        kWhatStartClient,
-        kWhatUDPNotify,
-        kWhatSendPacket,
-        kWhatTimedOut,
-    };
-
-    struct TimeInfo {
-        int64_t mT1;  // client timestamp at send
-        int64_t mT2;  // server timestamp at receive
-        int64_t mT3;  // server timestamp at send
-        int64_t mT4;  // client timestamp at receive
-    };
-
-    static const int64_t kTimeoutDelayUs = 1000000ll;
-
-    sp<ANetworkSession> mNetSession;
-
-    bool mIsServer;
-    bool mConnected;
-    int32_t mUDPSession;
-    uint32_t mSeqNo;
-    double mTotalTimeUs;
-    int32_t mCount;
-    int64_t mSumOffsets;
-
-    int64_t mPendingT1;
-    int32_t mTimeoutGeneration;
-
-    void postSendPacket(int64_t delayUs = 0ll);
-
-    void postTimeout();
-    void cancelTimeout();
-
-    DISALLOW_EVIL_CONSTRUCTORS(TestHandler);
-};
-
-TestHandler::TestHandler(const sp<ANetworkSession> &netSession)
-    : mNetSession(netSession),
-      mIsServer(false),
-      mConnected(false),
-      mUDPSession(0),
-      mSeqNo(0),
-      mTotalTimeUs(0.0),
-      mCount(0),
-      mSumOffsets(0ll),
-      mPendingT1(0ll),
-      mTimeoutGeneration(0) {
-}
-
-TestHandler::~TestHandler() {
-}
-
-void TestHandler::startServer(unsigned localPort) {
-    sp<AMessage> msg = new AMessage(kWhatStartServer, id());
-    msg->setInt32("localPort", localPort);
-    msg->post();
-}
-
-void TestHandler::startClient(const char *remoteHost, unsigned remotePort) {
-    sp<AMessage> msg = new AMessage(kWhatStartClient, id());
-    msg->setString("remoteHost", remoteHost);
-    msg->setInt32("remotePort", remotePort);
-    msg->post();
-}
-
-void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
-    switch (msg->what()) {
-        case kWhatStartClient:
-        {
-            AString remoteHost;
-            CHECK(msg->findString("remoteHost", &remoteHost));
-
-            int32_t remotePort;
-            CHECK(msg->findInt32("remotePort", &remotePort));
-
-            sp<AMessage> notify = new AMessage(kWhatUDPNotify, id());
-
-            CHECK_EQ((status_t)OK,
-                     mNetSession->createUDPSession(
-                         0 /* localPort */,
-                         remoteHost.c_str(),
-                         remotePort,
-                         notify,
-                         &mUDPSession));
-
-            postSendPacket();
-            break;
-        }
-
-        case kWhatStartServer:
-        {
-            mIsServer = true;
-
-            int32_t localPort;
-            CHECK(msg->findInt32("localPort", &localPort));
-
-            sp<AMessage> notify = new AMessage(kWhatUDPNotify, id());
-
-            CHECK_EQ((status_t)OK,
-                     mNetSession->createUDPSession(
-                         localPort, notify, &mUDPSession));
-
-            break;
-        }
-
-        case kWhatSendPacket:
-        {
-            TimeInfo ti;
-            memset(&ti, 0, sizeof(ti));
-
-            ti.mT1 = ALooper::GetNowUs();
-
-            CHECK_EQ((status_t)OK,
-                     mNetSession->sendRequest(
-                         mUDPSession, &ti, sizeof(ti)));
-
-            mPendingT1 = ti.mT1;
-            postTimeout();
-            break;
-        }
-
-        case kWhatTimedOut:
-        {
-            int32_t generation;
-            CHECK(msg->findInt32("generation", &generation));
-
-            if (generation != mTimeoutGeneration) {
-                break;
-            }
-
-            ALOGI("timed out, sending another request");
-            postSendPacket();
-            break;
-        }
-
-        case kWhatUDPNotify:
-        {
-            int32_t reason;
-            CHECK(msg->findInt32("reason", &reason));
-
-            switch (reason) {
-                case ANetworkSession::kWhatError:
-                {
-                    int32_t sessionID;
-                    CHECK(msg->findInt32("sessionID", &sessionID));
-
-                    int32_t err;
-                    CHECK(msg->findInt32("err", &err));
-
-                    AString detail;
-                    CHECK(msg->findString("detail", &detail));
-
-                    ALOGE("An error occurred in session %d (%d, '%s/%s').",
-                          sessionID,
-                          err,
-                          detail.c_str(),
-                          strerror(-err));
-
-                    mNetSession->destroySession(sessionID);
-
-                    cancelTimeout();
-                    looper()->stop();
-                    break;
-                }
-
-                case ANetworkSession::kWhatDatagram:
-                {
-                    int32_t sessionID;
-                    CHECK(msg->findInt32("sessionID", &sessionID));
-
-                    sp<ABuffer> packet;
-                    CHECK(msg->findBuffer("data", &packet));
-
-                    int64_t arrivalTimeUs;
-                    CHECK(packet->meta()->findInt64(
-                                "arrivalTimeUs", &arrivalTimeUs));
-
-                    CHECK_EQ(packet->size(), sizeof(TimeInfo));
-
-                    TimeInfo *ti = (TimeInfo *)packet->data();
-
-                    if (mIsServer) {
-                        if (!mConnected) {
-                            AString fromAddr;
-                            CHECK(msg->findString("fromAddr", &fromAddr));
-
-                            int32_t fromPort;
-                            CHECK(msg->findInt32("fromPort", &fromPort));
-
-                            CHECK_EQ((status_t)OK,
-                                     mNetSession->connectUDPSession(
-                                         mUDPSession, fromAddr.c_str(), fromPort));
-
-                            mConnected = true;
-                        }
-
-                        ti->mT2 = arrivalTimeUs;
-                        ti->mT3 = ALooper::GetNowUs();
-
-                        CHECK_EQ((status_t)OK,
-                                 mNetSession->sendRequest(
-                                     mUDPSession, ti, sizeof(*ti)));
-                    } else {
-                        if (ti->mT1 != mPendingT1) {
-                            break;
-                        }
-
-                        cancelTimeout();
-                        mPendingT1 = 0;
-
-                        ti->mT4 = arrivalTimeUs;
-
-                        // One way delay for a packet to travel from client
-                        // to server or back (assumed to be the same either way).
-                        int64_t delay =
-                            (ti->mT2 - ti->mT1 + ti->mT4 - ti->mT3) / 2;
-
-                        // Offset between the client clock (T1, T4) and the
-                        // server clock (T2, T3) timestamps.
-                        int64_t offset =
-                            (ti->mT2 - ti->mT1 - ti->mT4 + ti->mT3) / 2;
-
-                        mSumOffsets += offset;
-                        ++mCount;
-
-                        printf("delay = %lld us,\toffset %lld us\n",
-                               delay,
-                               offset);
-                        fflush(stdout);
-
-                        postSendPacket(1000000ll / 30);
-                    }
-                    break;
-                }
-
-                default:
-                    TRESPASS();
-            }
-
-            break;
-        }
-
-        default:
-            TRESPASS();
-    }
-}
-
-void TestHandler::postSendPacket(int64_t delayUs) {
-    (new AMessage(kWhatSendPacket, id()))->post(delayUs);
-}
-
-void TestHandler::postTimeout() {
-    sp<AMessage> msg = new AMessage(kWhatTimedOut, id());
-    msg->setInt32("generation", mTimeoutGeneration);
-    msg->post(kTimeoutDelayUs);
-}
-
-void TestHandler::cancelTimeout() {
-    ++mTimeoutGeneration;
-}
-
 }  // namespace android
 
 static void usage(const char *me) {
@@ -379,7 +100,7 @@ int main(int argc, char **argv) {
 
     sp<ALooper> looper = new ALooper;
 
-    sp<TestHandler> handler = new TestHandler(netSession);
+    sp<TimeSyncer> handler = new TimeSyncer(netSession, NULL /* notify */);
     looper->registerHandler(handler);
 
     if (localPort >= 0) {
index 3f4216a..0b18484 100644 (file)
@@ -321,7 +321,10 @@ int main(int argc, char **argv) {
     sp<ALooper> looper = new ALooper;
 
     sp<WifiDisplaySink> sink = new WifiDisplaySink(
-            session, surface->getIGraphicBufferProducer());
+            0 /* flags */,
+            session,
+            surface->getIGraphicBufferProducer());
+
     looper->registerHandler(sink);
 
     if (connectToPort >= 0) {