onSwapped(msg);
break;
}
+
+ case kWhatCheckSwitchDown:
+ {
+ onCheckSwitchDown();
+ break;
+ }
+
+ case kWhatSwitchDown:
+ {
+ onSwitchDown();
+ break;
+ }
+
default:
TRESPASS();
break;
// (finishDisconnect, onFinishDisconnect2)
cancelBandwidthSwitch();
+ // cancel switch down monitor
+ mSwitchDownMonitor.clear();
+
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
}
tryToFinishBandwidthSwitch();
}
+void LiveSession::onCheckSwitchDown() {
+ if (mSwitchDownMonitor == NULL) {
+ return;
+ }
+
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ int32_t targetDuration;
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
+ sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
+
+ if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
+ int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
+ int64_t targetDurationUs = targetDuration * 1000000ll;
+
+ if (bufferedDurationUs < targetDurationUs / 3) {
+ (new AMessage(kWhatSwitchDown, id()))->post();
+ break;
+ }
+ }
+ }
+
+ mSwitchDownMonitor->post(1000000ll);
+}
+
+void LiveSession::onSwitchDown() {
+ if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
+ return;
+ }
+
+ ssize_t bandwidthIndex = getBandwidthIndex();
+ if (bandwidthIndex < mCurBandwidthIndex) {
+ changeConfiguration(-1, bandwidthIndex, false);
+ return;
+ }
+
+ changeConfiguration(-1, mCurBandwidthIndex - 1, false);
+}
+
// Mark switch done when:
// 1. all old buffers are swapped out
void LiveSession::tryToFinishBandwidthSwitch() {
notify->post();
mInPreparationPhase = false;
+
+ mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
+ mSwitchDownMonitor->post();
}
} // namespace android
kWhatChangeConfiguration3 = 'chC3',
kWhatFinishDisconnect2 = 'fin2',
kWhatSwapped = 'swap',
+ kWhatCheckSwitchDown = 'ckSD',
+ kWhatSwitchDown = 'sDwn',
};
struct BandwidthItem {
bool mFirstTimeUsValid;
int64_t mFirstTimeUs;
int64_t mLastSeekTimeUs;
+ sp<AMessage> mSwitchDownMonitor;
KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs;
KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs;
void onChangeConfiguration2(const sp<AMessage> &msg);
void onChangeConfiguration3(const sp<AMessage> &msg);
void onSwapped(const sp<AMessage> &msg);
+ void onCheckSwitchDown();
+ void onSwitchDown();
void tryToFinishBandwidthSwitch();
void scheduleCheckBandwidthEvent();
mLastQueuedTimeUs(0),
mEOSResult(OK),
mLatestEnqueuedMeta(NULL),
- mLatestDequeuedMeta(NULL) {
+ mLatestDequeuedMeta(NULL),
+ mQueuedDiscontinuityCount(0) {
setFormat(meta);
}
mFormat.clear();
}
+ --mQueuedDiscontinuityCount;
return INFO_DISCONTINUITY;
}
mBuffers.push_back(buffer);
mCondition.signal();
+ int32_t discontinuity;
+ if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ ++mQueuedDiscontinuityCount;
+ }
+
if (mLatestEnqueuedMeta == NULL) {
mLatestEnqueuedMeta = buffer->meta();
} else {
mBuffers.clear();
mEOSResult = OK;
+ mQueuedDiscontinuityCount = 0;
mFormat = NULL;
mLatestEnqueuedMeta = NULL;
mEOSResult = OK;
mLastQueuedTimeUs = 0;
mLatestEnqueuedMeta = NULL;
+ ++mQueuedDiscontinuityCount;
sp<ABuffer> buffer = new ABuffer(0);
buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
Mutex::Autolock autoLock(mLock);
+ return getBufferedDurationUs_l(finalResult);
+}
+int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
*finalResult = mEOSResult;
if (mBuffers.empty()) {
int64_t time1 = -1;
int64_t time2 = -1;
+ int64_t durationUs = 0;
List<sp<ABuffer> >::iterator it = mBuffers.begin();
while (it != mBuffers.end()) {
int64_t timeUs;
if (buffer->meta()->findInt64("timeUs", &timeUs)) {
- if (time1 < 0) {
+ if (time1 < 0 || timeUs < time1) {
time1 = timeUs;
}
- time2 = timeUs;
+ if (time2 < 0 || timeUs > time2) {
+ time2 = timeUs;
+ }
} else {
// This is a discontinuity, reset everything.
+ durationUs += time2 - time1;
time1 = time2 = -1;
}
++it;
}
- return time2 - time1;
+ return durationUs + (time2 - time1);
+}
+
+// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
+// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
+int64_t AnotherPacketSource::getEstimatedDurationUs() {
+ Mutex::Autolock autoLock(mLock);
+ if (mBuffers.empty()) {
+ return 0;
+ }
+
+ if (mQueuedDiscontinuityCount > 0) {
+ status_t finalResult;
+ return getBufferedDurationUs_l(&finalResult);
+ }
+
+ List<sp<ABuffer> >::iterator it = mBuffers.begin();
+ sp<ABuffer> buffer = *it;
+
+ int64_t startTimeUs;
+ buffer->meta()->findInt64("timeUs", &startTimeUs);
+ if (startTimeUs < 0) {
+ return 0;
+ }
+
+ it = mBuffers.end();
+ --it;
+ buffer = *it;
+
+ int64_t endTimeUs;
+ buffer->meta()->findInt64("timeUs", &endTimeUs);
+ if (endTimeUs < 0) {
+ return 0;
+ }
+
+ int64_t diffUs;
+ if (endTimeUs > startTimeUs) {
+ diffUs = endTimeUs - startTimeUs;
+ } else {
+ diffUs = startTimeUs - endTimeUs;
+ }
+ return diffUs;
}
status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
// presentation timestamps since the last discontinuity (if any).
int64_t getBufferedDurationUs(status_t *finalResult);
+ int64_t getEstimatedDurationUs();
+
status_t nextBufferTime(int64_t *timeUs);
void queueAccessUnit(const sp<ABuffer> &buffer);
sp<AMessage> mLatestEnqueuedMeta;
sp<AMessage> mLatestDequeuedMeta;
+ size_t mQueuedDiscontinuityCount;
+
bool wasFormatChange(int32_t discontinuityType) const;
+ int64_t getBufferedDurationUs_l(status_t *finalResult);
DISALLOW_EVIL_CONSTRUCTORS(AnotherPacketSource);
};