const int64_t version) override {
std::lock_guard<std::mutex> lock(mMutex);
- if (mPullTagId != -1) {
+ if (mPullTagId != -1 && (mCondition == true || mConditionTrackerIndex < 0) ) {
vector<shared_ptr<LogEvent>> allData;
mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData);
if (allData.size() == 0) {
data->setElapsedTimestampNs(eventTimeNs);
onMatchedLogEventLocked(0, *data);
}
- } else { // For pushed value metric, we simply flush and reset the current bucket start.
+ } else {
+ // For pushed value metric or pulled metric where condition is not true,
+ // we simply flush and reset the current bucket start.
flushCurrentBucketLocked(eventTimeNs);
mCurrentBucketStartTimeNs = eventTimeNs;
}
FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade);
+ FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse);
FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue);
}
+TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ shared_ptr<MockStatsPullerManager> pullerManager =
+ make_shared<StrictMock<MockStatsPullerManager>>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ .WillOnce(Invoke([](int tagId, int64_t timeNs,
+ vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }))
+ .WillOnce(Invoke([](int tagId, int64_t timeNs,
+ vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ event->write(tagId);
+ event->write(120);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+ valueProducer.setBucketSize(60 * NS_PER_SEC);
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 1);
+
+ valueProducer.onConditionChanged(false, bucket2StartTimeNs-100);
+ EXPECT_FALSE(valueProducer.mCondition);
+
+ valueProducer.notifyAppUpgrade(bucket2StartTimeNs-50, "ANY.APP", 1, 1);
+ // Expect one full buckets already done and starting a partial bucket.
+ EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
+ EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue);
+ EXPECT_FALSE(valueProducer.mCondition);
+}
+
TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
ValueMetric metric;
metric.set_id(metricId);