OSDN Git Service

use running sum for ValueMetricProducer bucket
authorChenjie Yu <cjyu@google.com>
Sun, 10 Dec 2017 16:31:05 +0000 (08:31 -0800)
committerChenjie Yu <cjyu@google.com>
Thu, 14 Dec 2017 06:32:23 +0000 (22:32 -0800)
simplify ValueMetricProducer logic for pulled data

Test: unit test
Change-Id: Ic0a21a543166cc5c34c1fa505dba08d1fc2f510a

18 files changed:
cmds/statsd/src/logd/LogEvent.h
cmds/statsd/src/metrics/CountMetricProducer.cpp
cmds/statsd/src/metrics/CountMetricProducer.h
cmds/statsd/src/metrics/DurationMetricProducer.cpp
cmds/statsd/src/metrics/DurationMetricProducer.h
cmds/statsd/src/metrics/EventMetricProducer.cpp
cmds/statsd/src/metrics/EventMetricProducer.h
cmds/statsd/src/metrics/GaugeMetricProducer.cpp
cmds/statsd/src/metrics/GaugeMetricProducer.h
cmds/statsd/src/metrics/MetricProducer.cpp
cmds/statsd/src/metrics/MetricProducer.h
cmds/statsd/src/metrics/MetricsManager.cpp
cmds/statsd/src/metrics/ValueMetricProducer.cpp
cmds/statsd/src/metrics/ValueMetricProducer.h
cmds/statsd/tests/metrics/CountMetricProducer_test.cpp
cmds/statsd/tests/metrics/DurationMetricProducer_test.cpp
cmds/statsd/tests/metrics/EventMetricProducer_test.cpp
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp

index 7e8a96b..176e16e 100644 (file)
@@ -105,6 +105,11 @@ public:
      */
     void init();
 
+    /**
+     * Set timestamp if the original timestamp is missing.
+     */
+    void setTimestampNs(uint64_t timestampNs) {mTimestampNs = timestampNs;}
+
 private:
     /**
      * Don't copy, it's slower. If we really need this we can add it but let's try to
index fc12013..ae297d9 100644 (file)
@@ -188,7 +188,7 @@ bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
 void CountMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
 
     flushIfNeededLocked(eventTimeNs);
index 21bd9d6..8a17169 100644 (file)
@@ -58,7 +58,7 @@ protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-            const LogEvent& event, bool scheduledPull) override;
+            const LogEvent& event) override;
 
 private:
     void onDumpReportLocked(const uint64_t dumpTimeNs,
index 9920f65..c268798 100644 (file)
@@ -251,7 +251,7 @@ bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newK
 void DurationMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKeys, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     flushIfNeededLocked(event.GetTimestampNs());
 
     if (matcherIndex == mStopAllIndex) {
index e509af4..14504c1 100644 (file)
@@ -57,7 +57,7 @@ protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKeys, bool condition,
-            const LogEvent& event, bool scheduledPull) override;
+            const LogEvent& event) override;
 
 private:
     void onDumpReportLocked(const uint64_t dumpTimeNs,
index 217aff0..bcecf16 100644 (file)
@@ -122,7 +122,7 @@ void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,
 void EventMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     if (!condition) {
         return;
     }
index 75ccf47..49ba9d8 100644 (file)
@@ -53,7 +53,7 @@ private:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-            const LogEvent& event, bool scheduledPull) override;
+            const LogEvent& event) override;
 
     void onDumpReportLocked(const uint64_t dumpTimeNs,
                             android::util::ProtoOutputStream* protoOutput) override;
index 55d84e0..fffb2bf 100644 (file)
@@ -187,7 +187,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
         return;
     }
     for (const auto& data : allData) {
-        onMatchedLogEventLocked(0, *data, false /*scheduledPull*/);
+        onMatchedLogEventLocked(0, *data);
     }
     flushIfNeededLocked(eventTime);
 }
@@ -211,7 +211,7 @@ void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
     std::lock_guard<std::mutex> lock(mMutex);
 
     for (const auto& data : allData) {
-        onMatchedLogEventLocked(0, *data, true /*scheduledPull*/);
+        onMatchedLogEventLocked(0, *data);
     }
 }
 
@@ -238,7 +238,7 @@ bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     if (condition == false) {
         return;
     }
index e4bda02..ee4f40c 100644 (file)
@@ -66,7 +66,7 @@ protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-            const LogEvent& event, bool scheduledPull) override;
+            const LogEvent& event) override;
 
 private:
     void onDumpReportLocked(const uint64_t dumpTimeNs,
index 5a0a7c7..f38f3df 100644 (file)
@@ -21,8 +21,7 @@ namespace statsd {
 
 using std::map;
 
-void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
-                                             bool scheduledPull) {
+void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
     // this is old event, maybe statsd restarted?
     if (eventTimeNs < mStartTimeNs) {
@@ -60,8 +59,7 @@ void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const Lo
         condition = mCondition;
     }
 
-    onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event,
-                                    scheduledPull);
+    onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event);
 }
 
 }  // namespace statsd
index ef2ef29..269bd43 100644 (file)
@@ -54,9 +54,9 @@ public:
     virtual ~MetricProducer(){};
 
     // Consume the parsed stats log entry that already matched the "what" of the metric.
-    void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) {
+    void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event) {
         std::lock_guard<std::mutex> lock(mMutex);
-        onMatchedLogEventLocked(matcherIndex, event, scheduledPull);
+        onMatchedLogEventLocked(matcherIndex, event);
     }
 
     void onConditionChanged(const bool condition, const uint64_t eventTime) {
@@ -155,11 +155,10 @@ protected:
     virtual void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-            const LogEvent& event, bool scheduledPull) = 0;
+            const LogEvent& event) = 0;
 
     // Consume the parsed stats log entry that already matched the "what" of the metric.
-    void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
-                                 bool scheduledPull);
+    void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event);
 
     mutable std::mutex mMutex;
 };
index 0510fff..b0f0135 100644 (file)
@@ -162,8 +162,7 @@ void MetricsManager::onLogEvent(const LogEvent& event) {
                 auto& metricList = pair->second;
                 for (const int metricIndex : metricList) {
                     // pushed metrics are never scheduled pulls
-                    mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event,
-                                                                        false /* schedulePull */);
+                    mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event);
                 }
             }
         }
index c20c302..aabe5af 100644 (file)
@@ -185,9 +185,13 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, const u
     mCondition = condition;
 
     if (eventTime < mCurrentBucketStartTimeNs) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTime,
+             (long long)mCurrentBucketStartTimeNs);
         return;
     }
 
+    flushIfNeededLocked(eventTime);
+
     if (mPullTagId != -1) {
         if (mCondition == true) {
             mStatsPullerManager->RegisterReceiver(mPullTagId, this,
@@ -202,9 +206,8 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, const u
                 return;
             }
             for (const auto& data : allData) {
-                onMatchedLogEventLocked(0, *data, false);
+                onMatchedLogEventLocked(0, *data);
             }
-            flushIfNeededLocked(eventTime);
         }
         return;
     }
@@ -217,15 +220,22 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
         if (allData.size() == 0) {
             return;
         }
-        uint64_t eventTime = allData.at(0)->GetTimestampNs();
-        // alarm is not accurate and might drift.
-        if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) {
-            flushIfNeededLocked(eventTime);
+        // For scheduled pulled data, the effective event time is snap to the nearest
+        // bucket boundary to make bucket finalize.
+        uint64_t realEventTime = allData.at(0)->GetTimestampNs();
+        uint64_t eventTime = mStartTimeNs + ((realEventTime - mStartTimeNs)/mBucketSizeNs) * mBucketSizeNs;
+
+        mCondition = false;
+        for (const auto& data : allData) {
+            data->setTimestampNs(eventTime-1);
+            onMatchedLogEventLocked(0, *data);
         }
+
+        mCondition = true;
         for (const auto& data : allData) {
-            onMatchedLogEventLocked(0, *data, true);
+            data->setTimestampNs(eventTime);
+            onMatchedLogEventLocked(0, *data);
         }
-        flushIfNeededLocked(eventTime);
     }
 }
 
@@ -253,7 +263,7 @@ bool ValueMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
 void ValueMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -261,6 +271,8 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
         return;
     }
 
+    flushIfNeededLocked(eventTimeNs);
+
     if (hitGuardRailLocked(eventKey)) {
         return;
     }
@@ -268,36 +280,21 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
 
     long value = get_value(event);
 
-    if (mPullTagId != -1) {
-        if (scheduledPull) {
-            // scheduled pull always sets beginning of current bucket and end
-            // of next bucket
-            if (interval.raw.size() > 0) {
-                interval.raw.back().second = value;
-            } else {
-                interval.raw.push_back(make_pair(value, value));
-            }
-            Interval& nextInterval = mNextSlicedBucket[eventKey];
-            if (nextInterval.raw.size() == 0) {
-                nextInterval.raw.push_back(make_pair(value, 0));
-            } else {
-                nextInterval.raw.front().first = value;
-            }
+    if (mPullTagId != -1) { // for pulled events
+        if (mCondition == true) {
+            interval.start = value;
+            interval.startUpdated = true;
         } else {
-            if (mCondition == true) {
-                interval.raw.push_back(make_pair(value, 0));
+            if (interval.startUpdated) {
+                interval.sum += (value - interval.start);
+                interval.startUpdated = false;
             } else {
-                if (interval.raw.size() != 0) {
-                    interval.raw.back().second = value;
-                } else {
-                    interval.tainted = true;
-                    VLOG("Data on condition true missing!");
-                }
+                VLOG("No start for matching end %ld", value);
+                interval.tainted += 1;
             }
         }
-    } else {
-        flushIfNeededLocked(eventTimeNs);
-        interval.raw.push_back(make_pair(value, 0));
+    } else {    // for pushed events
+        interval.sum += value;
     }
 }
 
@@ -327,27 +324,16 @@ void ValueMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
 
     int tainted = 0;
     for (const auto& slice : mCurrentSlicedBucket) {
-        long value = 0;
-        if (mPullTagId != -1) {
-            for (const auto& pair : slice.second.raw) {
-                value += (pair.second - pair.first);
-            }
-        } else {
-            for (const auto& pair : slice.second.raw) {
-                value += pair.first;
-            }
-        }
         tainted += slice.second.tainted;
-        info.mValue = value;
-        VLOG(" %s, %ld, %d", slice.first.c_str(), value, tainted);
+        info.mValue = slice.second.sum;
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);
     }
+    VLOG("%d tainted pairs in the bucket", tainted);
 
     // Reset counters
-    mCurrentSlicedBucket.swap(mNextSlicedBucket);
-    mNextSlicedBucket.clear();
+    mCurrentSlicedBucket.clear();
 
     int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
     mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
index 8d60ff6..4c49927 100644 (file)
@@ -56,7 +56,7 @@ protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
-            const LogEvent& event, bool scheduledPull) override;
+            const LogEvent& event) override;
 
 private:
     void onDumpReportLocked(const uint64_t dumpTimeNs,
@@ -89,14 +89,19 @@ private:
 
     // internal state of a bucket.
     typedef struct {
-        std::vector<std::pair<long, long>> raw;
-        bool tainted;
+        // Pulled data always come in pair of <start, end>. This holds the value
+        // for start. The diff (end - start) is added to sum.
+        long start;
+        // Whether the start data point is updated
+        bool startUpdated;
+        // If end data point comes before the start, record this pair as tainted
+        // and the value is not added to the running sum.
+        int tainted;
+        // Running sum of known pairs in this bucket
+        long sum;
     } Interval;
 
     std::unordered_map<HashableDimensionKey, Interval> mCurrentSlicedBucket;
-    // If condition is true and pulling on schedule, the previous bucket value needs to be carried
-    // over to the next bucket.
-    std::unordered_map<HashableDimensionKey, Interval> mNextSlicedBucket;
 
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
index d973ba1..51eabd5 100644 (file)
@@ -54,8 +54,8 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
                                       bucketStartTimeNs);
 
     // 2 events in bucket 1.
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
 
     // Flushes at event #2.
     countProducer.flushIfNeededLocked(bucketStartTimeNs + 2);
@@ -74,7 +74,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
 
     // 1 matched event happens in bucket 2.
     LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 2);
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
     countProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
     EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
     EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
@@ -111,12 +111,12 @@ TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition) {
     CountMetricProducer countProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
 
     countProducer.onConditionChanged(true, bucketStartTimeNs);
-    countProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
+    countProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
     EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
 
     countProducer.onConditionChanged(false /*new condition*/, bucketStartTimeNs + 2);
     // Upon this match event, the matched event1 is flushed.
-    countProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
+    countProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
     EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
 
     countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
@@ -166,11 +166,11 @@ TEST(CountMetricProducerTest, TestEventsWithSlicedCondition) {
     CountMetricProducer countProducer(kConfigKey, metric, 1 /*condition tracker index*/, wizard,
                                       bucketStartTimeNs);
 
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
     countProducer.flushIfNeededLocked(bucketStartTimeNs + 1);
     EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
 
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
     countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
     EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
     EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
@@ -217,29 +217,29 @@ TEST(CountMetricProducerTest, TestAnomalyDetection) {
     LogEvent event7(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 3 + NS_PER_SEC);
 
     // Two events in bucket #0.
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
 
     EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
     EXPECT_EQ(2L, countProducer.mCurrentSlicedCounter->begin()->second);
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
 
     // One event in bucket #2. No alarm as bucket #0 is trashed out.
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
     EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
     EXPECT_EQ(1L, countProducer.mCurrentSlicedCounter->begin()->second);
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
 
     // Two events in bucket #3.
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event4, false);
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event5, false);
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event6, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event4);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event5);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event6);
     EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
     EXPECT_EQ(3L, countProducer.mCurrentSlicedCounter->begin()->second);
     // Anomaly at event 6 is within refractory period. The alarm is at event 5 timestamp not event 6
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event5.GetTimestampNs());
 
-    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event7, false);
+    countProducer.onMatchedLogEvent(1 /*log matcher index*/, event7);
     EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
     EXPECT_EQ(4L, countProducer.mCurrentSlicedCounter->begin()->second);
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event7.GetTimestampNs());
index 3158c27..58a4ac6 100644 (file)
@@ -56,8 +56,8 @@ TEST(DurationMetricTrackerTest, TestNoCondition) {
             kConfigKey, metric, -1 /*no condition*/, 1 /* start index */, 2 /* stop index */,
             3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs);
 
-    durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
-    durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
+    durationProducer.onMatchedLogEvent(1 /* start index*/, event1);
+    durationProducer.onMatchedLogEvent(2 /* stop index*/, event2);
     durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
     EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
     EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
@@ -94,14 +94,14 @@ TEST(DurationMetricTrackerTest, TestNonSlicedCondition) {
     EXPECT_FALSE(durationProducer.mCondition);
     EXPECT_FALSE(durationProducer.isConditionSliced());
 
-    durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
-    durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
+    durationProducer.onMatchedLogEvent(1 /* start index*/, event1);
+    durationProducer.onMatchedLogEvent(2 /* stop index*/, event2);
     durationProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
     EXPECT_EQ(0UL, durationProducer.mPastBuckets.size());
 
-    durationProducer.onMatchedLogEvent(1 /* start index*/, event3, false /* scheduledPull */);
+    durationProducer.onMatchedLogEvent(1 /* start index*/, event3);
     durationProducer.onConditionChanged(true /* condition */, bucketStartTimeNs + bucketSizeNs + 2);
-    durationProducer.onMatchedLogEvent(2 /* stop index*/, event4, false /* scheduledPull */);
+    durationProducer.onMatchedLogEvent(2 /* stop index*/, event4);
     durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
     EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
     EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
index f3302fd..e4fc67f 100644 (file)
@@ -50,8 +50,8 @@ TEST(EventMetricProducerTest, TestNoCondition) {
     EventMetricProducer eventProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                                       bucketStartTimeNs);
 
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
 
     // TODO: get the report and check the content after the ProtoOutputStream change is done.
     // eventProducer.onDumpReport();
@@ -74,11 +74,11 @@ TEST(EventMetricProducerTest, TestEventsWithNonSlicedCondition) {
     EventMetricProducer eventProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
 
     eventProducer.onConditionChanged(true /*condition*/, bucketStartTimeNs);
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
 
     eventProducer.onConditionChanged(false /*condition*/, bucketStartTimeNs + 2);
 
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
 
     // TODO: get the report and check the content after the ProtoOutputStream change is done.
     // eventProducer.onDumpReport();
@@ -115,8 +115,8 @@ TEST(EventMetricProducerTest, TestEventsWithSlicedCondition) {
 
     EventMetricProducer eventProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
 
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
-    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
+    eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
 
     // TODO: get the report and check the content after the ProtoOutputStream change is done.
     // eventProducer.onDumpReport();
index d320697..146a19d 100644 (file)
@@ -35,23 +35,23 @@ namespace os {
 namespace statsd {
 
 const ConfigKey kConfigKey(0, "test");
+const int tagId = 1;
+const string metricName = "test_metric";
+const int64_t bucketStartTimeNs = 10000000000;
+const int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
+const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
+const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
+const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
+
 /*
  * Tests pulled atoms with no conditions
  */
 TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
-    int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
-    int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
-
     ValueMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
     metric.set_value_field(2);
 
-    int tagId = 1;
-
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
     // TODO: pending refactor of StatsPullerManager
     // For now we still need this so that it doesn't do real pulling.
@@ -65,8 +65,8 @@ TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
 
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
-    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
-    event->write(1);
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(tagId);
     event->write(11);
     event->init();
     allData.push_back(event);
@@ -75,76 +75,60 @@ TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 11, 11
-    EXPECT_EQ(11, curInterval.raw.front().first);
-    EXPECT_EQ(11, curInterval.raw.front().second);
-    ValueMetricProducer::Interval nextInterval = valueProducer.mNextSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, nextInterval.raw.size());
-    // value is 11, 0
-    EXPECT_EQ(11, nextInterval.raw.front().first);
-    EXPECT_EQ(0, nextInterval.raw.front().second);
-    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+    // startUpdated:true tainted:0 sum:0 start:11
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second.back().mValue);
 
     allData.clear();
-    event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
-    event->write(1);
-    event->write(22);
+    event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
+    event->write(tagId);
+    event->write(23);
     event->init();
     allData.push_back(event);
     valueProducer.onDataPulled(allData);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 22, 0
-    EXPECT_EQ(22, curInterval.raw.front().first);
-    EXPECT_EQ(0, curInterval.raw.front().second);
-    EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
+    // tartUpdated:false tainted:0 sum:12
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
-    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(11, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
 
     allData.clear();
-    event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
-    event->write(1);
-    event->write(33);
+    event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
+    event->write(tagId);
+    event->write(36);
     event->init();
     allData.push_back(event);
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 33, 0
-    EXPECT_EQ(33, curInterval.raw.front().first);
-    EXPECT_EQ(0, curInterval.raw.front().second);
-    EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
+    // startUpdated:false tainted:0 sum:12
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
-    EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(11, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(3UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValue);
 }
 
 /*
  * Test pulled event with non sliced condition.
  */
 TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
-    int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
-    int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
-
     ValueMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
     metric.set_value_field(2);
     metric.set_condition("SCREEN_ON");
 
-    int tagId = 1;
-
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
     shared_ptr<MockStatsPullerManager> pullerManager =
             make_shared<StrictMock<MockStatsPullerManager>>();
@@ -153,28 +137,18 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
 
     EXPECT_CALL(*pullerManager, Pull(tagId, _))
             .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
-                int64_t bucketStartTimeNs = 10000000000;
-                int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
-                int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
-                int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
                 data->clear();
                 shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
-                event->write(1);
+                event->write(tagId);
                 event->write(100);
                 event->init();
                 data->push_back(event);
                 return true;
             }))
             .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
-                int64_t bucketStartTimeNs = 10000000000;
-                int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
-                int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
-                int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
                 data->clear();
                 shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
-                event->write(1);
+                event->write(tagId);
                 event->write(120);
                 event->init();
                 data->push_back(event);
@@ -184,17 +158,16 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
     ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
                                       pullerManager);
 
-    valueProducer.onConditionChanged(true, bucketStartTimeNs + 10);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 100, 0
-    EXPECT_EQ(100, curInterval.raw.front().first);
-    EXPECT_EQ(0, curInterval.raw.front().second);
-    EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
+    // startUpdated:false tainted:0 sum:0 start:100
+    EXPECT_EQ(100, curInterval.start);
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     vector<shared_ptr<LogEvent>> allData;
@@ -209,11 +182,8 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 110, 0
-    EXPECT_EQ(110, curInterval.raw.front().first);
-    EXPECT_EQ(0, curInterval.raw.front().second);
+    // startUpdated:false tainted:0 sum:0 start:110
+    EXPECT_EQ(110, curInterval.start);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
     EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
@@ -223,27 +193,17 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 110, 120
-    EXPECT_EQ(110, curInterval.raw.front().first);
-    EXPECT_EQ(120, curInterval.raw.front().second);
+    // startUpdated:false tainted:0 sum:0 start:110
+    EXPECT_EQ(10, curInterval.sum);
+    EXPECT_EQ(false, curInterval.startUpdated);
 }
 
 TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
-    int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
-    int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
-
     ValueMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
     metric.set_value_field(2);
 
-    int tagId = 1;
-
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
     shared_ptr<MockStatsPullerManager> pullerManager =
             make_shared<StrictMock<MockStatsPullerManager>>();
@@ -255,32 +215,22 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
     event1->write(1);
     event1->write(10);
     event1->init();
-    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
     event2->write(1);
     event2->write(20);
     event2->init();
-    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1, false);
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(1UL, curInterval.raw.size());
-    // value is 10, 0
-    EXPECT_EQ(10, curInterval.raw.front().first);
-    EXPECT_EQ(0, curInterval.raw.front().second);
-    EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
+    EXPECT_EQ(10, curInterval.sum);
 
-    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2, false);
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // has one raw pair
-    EXPECT_EQ(2UL, curInterval.raw.size());
-    // value is 10, 20
-    EXPECT_EQ(10, curInterval.raw.front().first);
-    EXPECT_EQ(20, curInterval.raw.back().first);
-    EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
+    EXPECT_EQ(30, curInterval.sum);
 
     valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());