OSDN Git Service

RTPReceiver can now track packet loss, account for late arrivals
authorAndreas Huber <andih@google.com>
Thu, 4 Apr 2013 18:17:05 +0000 (11:17 -0700)
committerAndreas Huber <andih@google.com>
Thu, 4 Apr 2013 18:17:05 +0000 (11:17 -0700)
it also uses timers to trigger retransmission and packet loss declaration

Change-Id: If1f9324783b3bef950076c2edf321f7c33ff9fea

media/libstagefright/wifi-display/rtp/RTPReceiver.cpp
media/libstagefright/wifi-display/rtp/RTPReceiver.h

index 238fb82..8fa1dae 100644 (file)
 #include <media/stagefright/MediaErrors.h>
 #include <media/stagefright/Utils.h>
 
+#define TRACK_PACKET_LOSS       0
+
 namespace android {
 
 ////////////////////////////////////////////////////////////////////////////////
 
-struct RTPReceiver::Source : public RefBase {
+struct RTPReceiver::Source : public AHandler {
     Source(RTPReceiver *receiver, uint32_t ssrc);
 
     void onPacketReceived(uint16_t seq, const sp<ABuffer> &buffer);
@@ -44,7 +46,14 @@ struct RTPReceiver::Source : public RefBase {
 protected:
     virtual ~Source();
 
+    virtual void onMessageReceived(const sp<AMessage> &msg);
+
 private:
+    enum {
+        kWhatRetransmit,
+        kWhatDeclareLost,
+    };
+
     static const uint32_t kMinSequential = 2;
     static const uint32_t kMaxDropout = 3000;
     static const uint32_t kMaxMisorder = 100;
@@ -67,6 +76,17 @@ private:
     // Ordered by extended seq number.
     List<sp<ABuffer> > mPackets;
 
+    enum StatusBits {
+        STATUS_DECLARED_LOST            = 1,
+        STATUS_REQUESTED_RETRANSMISSION = 2,
+        STATUS_ARRIVED_LATE             = 4,
+    };
+#if TRACK_PACKET_LOSS
+    KeyedVector<int32_t, uint32_t> mLostPackets;
+#endif
+
+    void modifyPacketStatus(int32_t extSeqNo, uint32_t mask);
+
     int32_t mAwaitingExtSeqNo;
     bool mRequestedRetransmission;
 
@@ -78,12 +98,20 @@ private:
     int32_t mNumDeclaredLost;
     int32_t mNumDeclaredLostPrior;
 
+    int32_t mRetransmitGeneration;
+    int32_t mDeclareLostGeneration;
+    bool mDeclareLostTimerPending;
+
     void queuePacket(const sp<ABuffer> &packet);
     void dequeueMore();
 
     sp<ABuffer> getNextPacket();
     void resync();
 
+    void postRetransmitTimer(int64_t delayUs);
+    void postDeclareLostTimer(int64_t delayUs);
+    void cancelTimers();
+
     DISALLOW_EVIL_CONSTRUCTORS(Source);
 };
 
@@ -106,12 +134,71 @@ RTPReceiver::Source::Source(RTPReceiver *receiver, uint32_t ssrc)
       mActivePacketType(-1),
       mNextReportTimeUs(-1ll),
       mNumDeclaredLost(0),
-      mNumDeclaredLostPrior(0) {
+      mNumDeclaredLostPrior(0),
+      mRetransmitGeneration(0),
+      mDeclareLostGeneration(0),
+      mDeclareLostTimerPending(false) {
 }
 
 RTPReceiver::Source::~Source() {
 }
 
+void RTPReceiver::Source::onMessageReceived(const sp<AMessage> &msg) {
+    switch (msg->what()) {
+        case kWhatRetransmit:
+        {
+            int32_t generation;
+            CHECK(msg->findInt32("generation", &generation));
+
+            if (generation != mRetransmitGeneration) {
+                break;
+            }
+
+            mRequestedRetransmission = true;
+            mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);
+
+            modifyPacketStatus(
+                    mAwaitingExtSeqNo, STATUS_REQUESTED_RETRANSMISSION);
+            break;
+        }
+
+        case kWhatDeclareLost:
+        {
+            int32_t generation;
+            CHECK(msg->findInt32("generation", &generation));
+
+            if (generation != mDeclareLostGeneration) {
+                break;
+            }
+
+            cancelTimers();
+
+            ALOGV("Lost packet extSeqNo %d %s",
+                  mAwaitingExtSeqNo,
+                  mRequestedRetransmission ? "*" : "");
+
+            mRequestedRetransmission = false;
+            if (mActiveAssembler != NULL) {
+                mActiveAssembler->signalDiscontinuity();
+            }
+
+            modifyPacketStatus(mAwaitingExtSeqNo, STATUS_DECLARED_LOST);
+
+            // resync();
+            ++mAwaitingExtSeqNo;
+            ++mNumDeclaredLost;
+
+            mReceiver->notifyPacketLost();
+
+            dequeueMore();
+            break;
+        }
+
+        default:
+            TRESPASS();
+    }
+}
+
 void RTPReceiver::Source::onPacketReceived(
         uint16_t seq, const sp<ABuffer> &buffer) {
     if (mFirst) {
@@ -164,6 +251,8 @@ void RTPReceiver::Source::queuePacket(const sp<ABuffer> &packet) {
     if (mAwaitingExtSeqNo >= 0 && newExtendedSeqNo < mAwaitingExtSeqNo) {
         // We're no longer interested in these. They're old.
         ALOGV("dropping stale extSeqNo %d", newExtendedSeqNo);
+
+        modifyPacketStatus(newExtendedSeqNo, STATUS_ARRIVED_LATE);
         return;
     }
 
@@ -230,85 +319,89 @@ void RTPReceiver::Source::dequeueMore() {
         }
 
         mNextReportTimeUs = nowUs + kReportIntervalUs;
-    }
 
-    for (;;) {
-        sp<ABuffer> packet = getNextPacket();
+#if TRACK_PACKET_LOSS
+        for (size_t i = 0; i < mLostPackets.size(); ++i) {
+            int32_t key = mLostPackets.keyAt(i);
+            uint32_t value = mLostPackets.valueAt(i);
 
-        if (packet == NULL) {
-            if (mPackets.empty()) {
-                break;
+            AString status;
+            if (value & STATUS_REQUESTED_RETRANSMISSION) {
+                status.append("retrans ");
+            }
+            if (value & STATUS_ARRIVED_LATE) {
+                status.append("arrived-late ");
             }
+            ALOGI("Packet %d declared lost %s", key, status.c_str());
+        }
+#endif
+    }
+
+    sp<ABuffer> packet;
+    while ((packet = getNextPacket()) != NULL) {
+        if (mDeclareLostTimerPending) {
+            cancelTimers();
+        }
+
+        CHECK_GE(mAwaitingExtSeqNo, 0);
+#if TRACK_PACKET_LOSS
+        mLostPackets.removeItem(mAwaitingExtSeqNo);
+#endif
 
-            CHECK_GE(mAwaitingExtSeqNo, 0);
+        int32_t packetType;
+        CHECK(packet->meta()->findInt32("PT", &packetType));
 
-            const sp<ABuffer> &firstPacket = *mPackets.begin();
+        if (packetType != mActivePacketType) {
+            mActiveAssembler = mReceiver->makeAssembler(packetType);
+            mActivePacketType = packetType;
+        }
 
-            uint32_t rtpTime;
-            CHECK(firstPacket->meta()->findInt32(
-                        "rtp-time", (int32_t *)&rtpTime));
+        if (mActiveAssembler != NULL) {
+            status_t err = mActiveAssembler->processPacket(packet);
+            if (err != OK) {
+                ALOGV("assembler returned error %d", err);
+            }
+        }
 
+        ++mAwaitingExtSeqNo;
+    }
 
-            int64_t rtpUs = (rtpTime * 100ll) / 9ll;
+    if (mDeclareLostTimerPending) {
+        return;
+    }
 
-            int64_t maxArrivalTimeUs =
-                mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs;
+    if (mPackets.empty()) {
+        return;
+    }
 
-            int64_t nowUs = ALooper::GetNowUs();
+    CHECK_GE(mAwaitingExtSeqNo, 0);
 
-            CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data());
+    const sp<ABuffer> &firstPacket = *mPackets.begin();
 
-            ALOGV("waiting for %d, comparing against %d, %lld us left",
-                  mAwaitingExtSeqNo,
-                  firstPacket->int32Data(),
-                  maxArrivalTimeUs - nowUs);
+    uint32_t rtpTime;
+    CHECK(firstPacket->meta()->findInt32(
+                "rtp-time", (int32_t *)&rtpTime));
 
-            if (maxArrivalTimeUs + kPacketLostAfterUs <= nowUs) {
-                ALOGV("Lost packet extSeqNo %d %s",
-                      mAwaitingExtSeqNo,
-                      mRequestedRetransmission ? "*" : "");
 
-                mRequestedRetransmission = false;
-                if (mActiveAssembler != NULL) {
-                    mActiveAssembler->signalDiscontinuity();
-                }
+    int64_t rtpUs = (rtpTime * 100ll) / 9ll;
 
-                // resync();
-                ++mAwaitingExtSeqNo;
-                ++mNumDeclaredLost;
-
-                mReceiver->notifyPacketLost();
-                continue;
-            } else if (kRequestRetransmissionAfterUs > 0
-                    && maxArrivalTimeUs + kRequestRetransmissionAfterUs <= nowUs
-                    && !mRequestedRetransmission
-                    && mAwaitingExtSeqNo >= 0) {
-                mRequestedRetransmission = true;
-                mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);
-                break;
-            } else {
-                break;
-            }
-        }
+    int64_t maxArrivalTimeUs =
+        mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs;
 
-        mRequestedRetransmission = false;
+    nowUs = ALooper::GetNowUs();
 
-        int32_t packetType;
-        CHECK(packet->meta()->findInt32("PT", &packetType));
+    CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data());
 
-        if (packetType != mActivePacketType) {
-            mActiveAssembler = mReceiver->makeAssembler(packetType);
-            mActivePacketType = packetType;
-        }
+    ALOGV("waiting for %d, comparing against %d, %lld us left",
+          mAwaitingExtSeqNo,
+          firstPacket->int32Data(),
+          maxArrivalTimeUs - nowUs);
 
-        if (mActiveAssembler == NULL) {
-            continue;
-        }
+    postDeclareLostTimer(maxArrivalTimeUs + kPacketLostAfterUs);
 
-        status_t err = mActiveAssembler->processPacket(packet);
-        if (err != OK) {
-            ALOGV("assembler returned error %d", err);
-        }
+    if (kRequestRetransmissionAfterUs > 0ll) {
+        postRetransmitTimer(
+                maxArrivalTimeUs + kRequestRetransmissionAfterUs);
     }
 }
 
@@ -328,8 +421,6 @@ sp<ABuffer> RTPReceiver::Source::getNextPacket() {
     sp<ABuffer> packet = *mPackets.begin();
     mPackets.erase(mPackets.begin());
 
-    ++mAwaitingExtSeqNo;
-
     return packet;
 }
 
@@ -404,9 +495,11 @@ void RTPReceiver::Source::addReportBlock(
 
 RTPReceiver::RTPReceiver(
         const sp<ANetworkSession> &netSession,
-        const sp<AMessage> &notify)
+        const sp<AMessage> &notify,
+        uint32_t flags)
     : mNetSession(netSession),
       mNotify(notify),
+      mFlags(flags),
       mRTPMode(TRANSPORT_UNDEFINED),
       mRTCPMode(TRANSPORT_UNDEFINED),
       mRTPSessionID(0),
@@ -693,6 +786,20 @@ void RTPReceiver::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
             CHECK(msg->findBuffer("data", &data));
 
             if (isRTP) {
+                if (mFlags & FLAG_AUTO_CONNECT) {
+                    AString fromAddr;
+                    CHECK(msg->findString("fromAddr", &fromAddr));
+
+                    int32_t fromPort;
+                    CHECK(msg->findInt32("fromPort", &fromPort));
+
+                    CHECK_EQ((status_t)OK,
+                             connect(
+                                 fromAddr.c_str(), fromPort, fromPort + 1));
+
+                    mFlags &= ~FLAG_AUTO_CONNECT;
+                }
+
                 onRTPData(data);
             } else {
                 onRTCPData(data);
@@ -835,6 +942,8 @@ status_t RTPReceiver::onRTPData(const sp<ABuffer> &buffer) {
     sp<Source> source;
     if (index < 0) {
         source = new Source(this, srcId);
+        looper()->registerHandler(source);
+
         mSources.add(srcId, source);
     } else {
         source = mSources.valueAt(index);
@@ -965,6 +1074,7 @@ sp<RTPReceiver::Assembler> RTPReceiver::makeAssembler(uint8_t packetType) {
     PacketizationMode mode = mPacketTypes.valueAt(index);
 
     switch (mode) {
+        case PACKETIZATION_NONE:
         case PACKETIZATION_TRANSPORT_STREAM:
             return new TSAssembler(mNotify);
 
@@ -1005,5 +1115,39 @@ void RTPReceiver::requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo) {
      mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
 }
 
+void RTPReceiver::Source::modifyPacketStatus(int32_t extSeqNo, uint32_t mask) {
+#if TRACK_PACKET_LOSS
+    ssize_t index = mLostPackets.indexOfKey(extSeqNo);
+    if (index < 0) {
+        mLostPackets.add(extSeqNo, mask);
+    } else {
+        mLostPackets.editValueAt(index) |= mask;
+    }
+#endif
+}
+
+void RTPReceiver::Source::postRetransmitTimer(int64_t timeUs) {
+    int64_t delayUs = timeUs - ALooper::GetNowUs();
+    sp<AMessage> msg = new AMessage(kWhatRetransmit, id());
+    msg->setInt32("generation", mRetransmitGeneration);
+    msg->post(delayUs);
+}
+
+void RTPReceiver::Source::postDeclareLostTimer(int64_t timeUs) {
+    CHECK(!mDeclareLostTimerPending);
+    mDeclareLostTimerPending = true;
+
+    int64_t delayUs = timeUs - ALooper::GetNowUs();
+    sp<AMessage> msg = new AMessage(kWhatDeclareLost, id());
+    msg->setInt32("generation", mDeclareLostGeneration);
+    msg->post(delayUs);
+}
+
+void RTPReceiver::Source::cancelTimers() {
+    ++mRetransmitGeneration;
+    ++mDeclareLostGeneration;
+    mDeclareLostTimerPending = false;
+}
+
 }  // namespace android
 
index 630bce9..240ab2e 100644 (file)
@@ -39,9 +39,14 @@ struct RTPReceiver : public RTPBase, public AHandler {
         kWhatAccessUnit,
         kWhatPacketLost,
     };
+
+    enum Flags {
+        FLAG_AUTO_CONNECT = 1,
+    };
     RTPReceiver(
             const sp<ANetworkSession> &netSession,
-            const sp<AMessage> &notify);
+            const sp<AMessage> &notify,
+            uint32_t flags = 0);
 
     status_t registerPacketType(
             uint8_t packetType, PacketizationMode mode);
@@ -82,6 +87,7 @@ private:
 
     sp<ANetworkSession> mNetSession;
     sp<AMessage> mNotify;
+    uint32_t mFlags;
     TransportMode mRTPMode;
     TransportMode mRTCPMode;
     int32_t mRTPSessionID;