OSDN Git Service

add feature: GaugeMetricProducer now takes repeated list of fields
authorChenjie Yu <cjyu@google.com>
Tue, 12 Dec 2017 01:41:20 +0000 (17:41 -0800)
committerChenjie Yu <cjyu@google.com>
Sun, 17 Dec 2017 01:12:45 +0000 (17:12 -0800)
bug fix: GaugeMetricProducer now works better with pulled events.
unit test also includes GaugeMetricProducer_test

Test: unit test
Change-Id: Ic60f09342d14cfb107be2130d445b323a56909e0

12 files changed:
cmds/statsd/Android.mk
cmds/statsd/src/config/ConfigManager.cpp
cmds/statsd/src/logd/LogEvent.cpp
cmds/statsd/src/logd/LogEvent.h
cmds/statsd/src/metrics/GaugeMetricProducer.cpp
cmds/statsd/src/metrics/GaugeMetricProducer.h
cmds/statsd/src/metrics/metrics_manager_util.cpp
cmds/statsd/src/stats_log.proto
cmds/statsd/src/stats_util.cpp
cmds/statsd/src/stats_util.h
cmds/statsd/src/statsd_config.proto
cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp

index addba8c..3e517bb 100644 (file)
@@ -173,6 +173,7 @@ LOCAL_SRC_FILES := \
     tests/metrics/DurationMetricProducer_test.cpp \
     tests/metrics/EventMetricProducer_test.cpp \
     tests/metrics/ValueMetricProducer_test.cpp \
+    tests/metrics/GaugeMetricProducer_test.cpp \
     tests/guardrail/StatsdStats_test.cpp
 
 LOCAL_STATIC_LIBRARIES := \
index 164f88f..a28da5d 100644 (file)
@@ -369,7 +369,7 @@ StatsdConfig build_fake_config() {
     GaugeMetric* gaugeMetric = config.add_gauge_metric();
     gaugeMetric->set_name("METRIC_10");
     gaugeMetric->set_what("DEVICE_TEMPERATURE");
-    gaugeMetric->set_gauge_field(DEVICE_TEMPERATURE_KEY);
+    gaugeMetric->mutable_gauge_fields()->add_field_num(DEVICE_TEMPERATURE_KEY);
     gaugeMetric->mutable_bucket()->set_bucket_size_millis(60 * 1000L);
 
     // Event matchers............
index 1032138..01487f0 100644 (file)
@@ -69,6 +69,13 @@ bool LogEvent::write(uint32_t value) {
     return false;
 }
 
+bool LogEvent::write(int64_t value) {
+    if (mContext) {
+        return android_log_write_int64(mContext, value) >= 0;
+    }
+    return false;
+}
+
 bool LogEvent::write(uint64_t value) {
     if (mContext) {
         return android_log_write_int64(mContext, value) >= 0;
@@ -224,7 +231,7 @@ KeyValuePair LogEvent::GetKeyValueProto(size_t key) const {
     if (elem.type == EVENT_TYPE_INT) {
         pair.set_value_int(elem.data.int32);
     } else if (elem.type == EVENT_TYPE_LONG) {
-        pair.set_value_int(elem.data.int64);
+        pair.set_value_long(elem.data.int64);
     } else if (elem.type == EVENT_TYPE_STRING) {
         pair.set_value_str(elem.data.string);
     } else if (elem.type == EVENT_TYPE_FLOAT) {
index 176e16e..6ff6b87 100644 (file)
@@ -110,6 +110,10 @@ public:
      */
     void setTimestampNs(uint64_t timestampNs) {mTimestampNs = timestampNs;}
 
+    int size() const {
+        return mElements.size();
+    }
+
 private:
     /**
      * Don't copy, it's slower. If we really need this we can add it but let's try to
index ae9b86f..7ec57dc 100644 (file)
 
 #include "GaugeMetricProducer.h"
 #include "guardrail/StatsdStats.h"
-#include "stats_util.h"
 
 #include <cutils/log.h>
-#include <limits.h>
-#include <stdlib.h>
 
 using android::util::FIELD_COUNT_REPEATED;
 using android::util::FIELD_TYPE_BOOL;
@@ -37,6 +34,8 @@ using std::map;
 using std::string;
 using std::unordered_map;
 using std::vector;
+using std::make_shared;
+using std::shared_ptr;
 
 namespace android {
 namespace os {
@@ -61,21 +60,27 @@ const int FIELD_ID_VALUE_FLOAT = 5;
 // for GaugeBucketInfo
 const int FIELD_ID_START_BUCKET_NANOS = 1;
 const int FIELD_ID_END_BUCKET_NANOS = 2;
-const int FIELD_ID_GAUGE = 3;
+const int FIELD_ID_ATOM = 3;
 
 GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
                                          const int conditionIndex,
-                                         const sp<ConditionWizard>& wizard, const int pullTagId,
-                                         const int64_t startTimeNs)
+                                         const sp<ConditionWizard>& wizard, const int atomTagId,
+                                         const int pullTagId, const uint64_t startTimeNs,
+                                         shared_ptr<StatsPullerManager> statsPullerManager)
     : MetricProducer(metric.name(), key, startTimeNs, conditionIndex, wizard),
-      mGaugeField(metric.gauge_field()),
-      mPullTagId(pullTagId) {
+      mStatsPullerManager(statsPullerManager),
+      mPullTagId(pullTagId),
+      mAtomTagId(atomTagId) {
     if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) {
         mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000;
     } else {
         mBucketSizeNs = kDefaultGaugemBucketSizeNs;
     }
 
+    for (int i = 0; i < metric.gauge_fields().field_num_size(); i++) {
+        mGaugeFields.push_back(metric.gauge_fields().field_num(i));
+    }
+
     // TODO: use UidMap if uid->pkg_name is required
     mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end());
 
@@ -87,7 +92,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
 
     // Kicks off the puller immediately.
     if (mPullTagId != -1) {
-        mStatsPullerManager.RegisterReceiver(mPullTagId, this,
+        mStatsPullerManager->RegisterReceiver(mPullTagId, this,
                                              metric.bucket().bucket_size_millis());
     }
 
@@ -95,10 +100,19 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
 
+// for testing
+GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
+                                         const int conditionIndex,
+                                         const sp<ConditionWizard>& wizard, const int pullTagId,
+                                         const int atomTagId, const int64_t startTimeNs)
+    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs,
+                          make_shared<StatsPullerManager>()) {
+}
+
 GaugeMetricProducer::~GaugeMetricProducer() {
     VLOG("~GaugeMetricProducer() called");
     if (mPullTagId != -1) {
-        mStatsPullerManager.UnRegisterReceiver(mPullTagId, this);
+        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
     }
 }
 
@@ -149,10 +163,26 @@ void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
                                (long long)bucket.mBucketStartNs);
             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
                                (long long)bucket.mBucketEndNs);
-            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
+            long long atomToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM);
+            long long eventToken = protoOutput->start(FIELD_TYPE_MESSAGE | mAtomTagId);
+            for (const auto& pair : bucket.mEvent->kv) {
+                if (pair.has_value_int()) {
+                    protoOutput->write(FIELD_TYPE_INT32 | pair.key(), pair.value_int());
+                } else if (pair.has_value_long()) {
+                    protoOutput->write(FIELD_TYPE_INT64 | pair.key(), pair.value_long());
+                } else if (pair.has_value_str()) {
+                    protoOutput->write(FIELD_TYPE_STRING | pair.key(), pair.value_str());
+                } else if (pair.has_value_long()) {
+                    protoOutput->write(FIELD_TYPE_FLOAT | pair.key(), pair.value_float());
+                } else if (pair.has_value_bool()) {
+                    protoOutput->write(FIELD_TYPE_BOOL | pair.key(), pair.value_bool());
+                }
+            }
+            protoOutput->end(eventToken);
+            protoOutput->end(atomToken);
             protoOutput->end(bucketInfoToken);
-            VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
-                 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
+            VLOG("\t bucket [%lld - %lld] content: %s", (long long)bucket.mBucketStartNs,
+                 (long long)bucket.mBucketEndNs, bucket.mEvent->ToString().c_str());
         }
         protoOutput->end(wrapperToken);
     }
@@ -174,6 +204,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
     if (mPullTagId == -1) {
         return;
     }
+    // No need to pull again. Either scheduled pull or condition on true happened
     if (!mCondition) {
         return;
     }
@@ -182,7 +213,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
         return;
     }
     vector<std::shared_ptr<LogEvent>> allData;
-    if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
         ALOGE("Stats puller failed for tag: %d", mPullTagId);
         return;
     }
@@ -196,20 +227,25 @@ void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventT
     VLOG("Metric %s onSlicedConditionMayChange", mName.c_str());
 }
 
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
-    status_t err = NO_ERROR;
-    int64_t val = event.GetLong(mGaugeField, &err);
-    if (err == NO_ERROR) {
-        return val;
+shared_ptr<EventKV> GaugeMetricProducer::getGauge(const LogEvent& event) {
+    shared_ptr<EventKV> ret = make_shared<EventKV>();
+    if (mGaugeFields.size() == 0) {
+        for (int i = 1; i <= event.size(); i++) {
+            ret->kv.push_back(event.GetKeyValueProto(i));
+        }
     } else {
-        VLOG("Can't find value in message.");
-        return -1;
+        for (int i = 0; i < (int)mGaugeFields.size(); i++) {
+            ret->kv.push_back(event.GetKeyValueProto(mGaugeFields[i]));
+        }
     }
+    return ret;
 }
 
 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
     std::lock_guard<std::mutex> lock(mMutex);
-
+    if (allData.size() == 0) {
+        return;
+    }
     for (const auto& data : allData) {
         onMatchedLogEventLocked(0, *data);
     }
@@ -247,25 +283,48 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked(
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
-
-    // When the event happens in a new bucket, flush the old buckets.
-    if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
-        flushIfNeededLocked(eventTimeNs);
-    }
+    flushIfNeededLocked(eventTimeNs);
 
     // For gauge metric, we just simply use the first gauge in the given bucket.
-    if (!mCurrentSlicedBucket->empty()) {
+    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end()) {
         return;
     }
-    const long gauge = getGauge(event);
-    if (gauge >= 0) {
-        if (hitGuardRailLocked(eventKey)) {
-            return;
+    shared_ptr<EventKV> gauge = getGauge(event);
+    if (hitGuardRailLocked(eventKey)) {
+        return;
+    }
+    (*mCurrentSlicedBucket)[eventKey] = gauge;
+    // Anomaly detection on gauge metric only works when there is one numeric
+    // field specified.
+    if (mAnomalyTrackers.size() > 0) {
+        if (gauge->kv.size() == 1) {
+            KeyValuePair pair = gauge->kv[0];
+            long gaugeVal = 0;
+            if (pair.has_value_int()) {
+                gaugeVal = (long)pair.value_int();
+            } else if (pair.has_value_long()) {
+                gaugeVal = pair.value_long();
+            }
+            for (auto& tracker : mAnomalyTrackers) {
+                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+                                                 gaugeVal);
+            }
         }
-        (*mCurrentSlicedBucket)[eventKey] = gauge;
     }
-    for (auto& tracker : mAnomalyTrackers) {
-        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
+}
+
+void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
+    mCurrentSlicedBucketForAnomaly->clear();
+    status_t err = NO_ERROR;
+    for (const auto& slice : *mCurrentSlicedBucket) {
+        KeyValuePair pair = slice.second->kv[0];
+        long gaugeVal = 0;
+        if (pair.has_value_int()) {
+            gaugeVal = (long)pair.value_int();
+        } else if (pair.has_value_long()) {
+            gaugeVal = pair.value_long();
+        }
+        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
     }
 }
 
@@ -276,6 +335,8 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked(
 // the GaugeMetricProducer while holding the lock.
 void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
+        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
+             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
         return;
     }
 
@@ -285,19 +346,22 @@ void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
     info.mBucketNum = mCurrentBucketNum;
 
     for (const auto& slice : *mCurrentSlicedBucket) {
-        info.mGauge = slice.second;
+        info.mEvent = slice.second;
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);
-        VLOG("gauge metric %s, dump key value: %s -> %lld", mName.c_str(), slice.first.c_str(),
-             (long long)slice.second);
+        VLOG("gauge metric %s, dump key value: %s -> %s", mName.c_str(),
+             slice.first.c_str(), slice.second->ToString().c_str());
     }
 
     // Reset counters
-    for (auto& tracker : mAnomalyTrackers) {
-        tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum);
+    if (mAnomalyTrackers.size() > 0) {
+        updateCurrentSlicedBucketForAnomaly();
+        for (auto& tracker : mAnomalyTrackers) {
+            tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
+        }
     }
 
-    mCurrentSlicedBucket = std::make_shared<DimToValMap>();
+    mCurrentSlicedBucket = std::make_shared<DimToEventKVMap>();
 
     // Adjusts the bucket start time
     int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
index 6e6f2bb..4a037ff 100644 (file)
@@ -26,7 +26,7 @@
 #include "../matchers/matcher_util.h"
 #include "MetricProducer.h"
 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
-#include "stats_util.h"
+#include "../stats_util.h"
 
 namespace android {
 namespace os {
@@ -35,7 +35,7 @@ namespace statsd {
 struct GaugeBucket {
     int64_t mBucketStartNs;
     int64_t mBucketEndNs;
-    int64_t mGauge;
+    std::shared_ptr<EventKV> mEvent;
     uint64_t mBucketNum;
 };
 
@@ -49,7 +49,7 @@ public:
     // for all metrics.
     GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& countMetric,
                         const int conditionIndex, const sp<ConditionWizard>& wizard,
-                        const int pullTagId, const int64_t startTimeNs);
+                        const int pullTagId, const int atomTagId, const int64_t startTimeNs);
 
     virtual ~GaugeMetricProducer();
 
@@ -72,6 +72,12 @@ private:
     void onDumpReportLocked(const uint64_t dumpTimeNs,
                             android::util::ProtoOutputStream* protoOutput) override;
 
+    // for testing
+    GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& gaugeMetric,
+                        const int conditionIndex, const sp<ConditionWizard>& wizard,
+                        const int pullTagId, const int atomTagId, const uint64_t startTimeNs,
+                        std::shared_ptr<StatsPullerManager> statsPullerManager);
+
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
 
@@ -84,12 +90,10 @@ private:
     // Util function to flush the old packet.
     void flushIfNeededLocked(const uint64_t& eventTime);
 
-    // The default bucket size for gauge metric is 1 second.
-    static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
+    // The default bucket size for gauge metric is 1 hr.
+    static const uint64_t kDefaultGaugemBucketSizeNs = 60ULL * 60 * 1000 * 1000 * 1000;
 
-    const int32_t mGaugeField;
-
-    StatsPullerManager mStatsPullerManager;
+    std::shared_ptr<StatsPullerManager> mStatsPullerManager;
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
@@ -98,9 +102,21 @@ private:
     std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
 
     // The current bucket.
-    std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
+    std::shared_ptr<DimToEventKVMap> mCurrentSlicedBucket = std::make_shared<DimToEventKVMap>();
+
+    // The current bucket for anomaly detection.
+    std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
+
+    // Translate Atom based bucket to single numeric value bucket for anomaly
+    void updateCurrentSlicedBucketForAnomaly();
+
+    int mAtomTagId;
+
+    // Whitelist of fields to report. Empty means all are reported.
+    std::vector<int> mGaugeFields;
 
-    int64_t getGauge(const LogEvent& event);
+    // apply a whitelist on the original input
+    std::shared_ptr<EventKV> getGauge(const LogEvent& event);
 
     // Util function to check whether the specified dimension hits the guardrail.
     bool hitGuardRailLocked(const HashableDimensionKey& newKey);
index 943becb..e20fb52 100644 (file)
@@ -370,15 +370,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
 
         sp<LogMatchingTracker> atomMatcher = allAtomMatchers.at(trackerIndex);
         // If it is pulled atom, it should be simple matcher with one tagId.
-        int pullTagId = -1;
-        for (int tagId : atomMatcher->getTagIds()) {
-            if (statsPullerManager.PullerForMatcherExists(tagId)) {
-                if (atomMatcher->getTagIds().size() != 1) {
-                    return false;
-                }
-                pullTagId = tagId;
-            }
+        if (atomMatcher->getTagIds().size() != 1) {
+            return false;
         }
+        int atomTagId = *(atomMatcher->getTagIds().begin());
+        int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1;
 
         int conditionIndex = -1;
         if (metric.has_condition()) {
@@ -404,7 +400,17 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
     for (int i = 0; i < config.gauge_metric_size(); i++) {
         const GaugeMetric& metric = config.gauge_metric(i);
         if (!metric.has_what()) {
-            ALOGW("cannot find \"what\" in ValueMetric \"%s\"", metric.name().c_str());
+            ALOGW("cannot find \"what\" in GaugeMetric \"%s\"", metric.name().c_str());
+            return false;
+        }
+
+        if (((!metric.gauge_fields().has_include_all() ||
+              (metric.gauge_fields().has_include_all() &&
+               metric.gauge_fields().include_all() == false)) &&
+             metric.gauge_fields().field_num_size() == 0) ||
+            (metric.gauge_fields().has_include_all() && metric.gauge_fields().include_all() == true &&
+             metric.gauge_fields().field_num_size() > 0)) {
+            ALOGW("Incorrect field filter setting in GaugeMetric %s", metric.name().c_str());
             return false;
         }
 
@@ -419,15 +425,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
 
         sp<LogMatchingTracker> atomMatcher = allAtomMatchers.at(trackerIndex);
         // If it is pulled atom, it should be simple matcher with one tagId.
-        int pullTagId = -1;
-        for (int tagId : atomMatcher->getTagIds()) {
-            if (statsPullerManager.PullerForMatcherExists(tagId)) {
-                if (atomMatcher->getTagIds().size() != 1) {
-                    return false;
-                }
-                pullTagId = tagId;
-            }
+        if (atomMatcher->getTagIds().size() != 1) {
+            return false;
         }
+        int atomTagId = *(atomMatcher->getTagIds().begin());
+        int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1;
 
         int conditionIndex = -1;
         if (metric.has_condition()) {
@@ -444,8 +446,8 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
             }
         }
 
-        sp<MetricProducer> gaugeProducer = new GaugeMetricProducer(key, metric, conditionIndex,
-                                                                   wizard, pullTagId, startTimeNs);
+        sp<MetricProducer> gaugeProducer = new GaugeMetricProducer(
+                key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs);
         allMetricProducers.push_back(gaugeProducer);
     }
     return true;
index 20d9d5c..3c85c57 100644 (file)
@@ -29,9 +29,10 @@ message KeyValuePair {
 
   oneof value {
     string value_str = 2;
-    int64 value_int = 3;
-    bool value_bool = 4;
-    float value_float = 5;
+    int32 value_int = 3;
+    int64 value_long = 4;
+    bool value_bool = 5;
+    float value_float = 6;
   }
 }
 
@@ -88,7 +89,7 @@ message GaugeBucketInfo {
 
   optional int64 end_bucket_nanos = 2;
 
-  optional int64 gauge = 3;
+  optional Atom atom = 3;
 }
 
 message GaugeMetricData {
index bfa3254..7527a64 100644 (file)
@@ -35,6 +35,9 @@ HashableDimensionKey getHashableKey(std::vector<KeyValuePair> keys) {
             case KeyValuePair::ValueCase::kValueInt:
                 flattened += std::to_string(pair.value_int());
                 break;
+            case KeyValuePair::ValueCase::kValueLong:
+                flattened += std::to_string(pair.value_long());
+                break;
             case KeyValuePair::ValueCase::kValueBool:
                 flattened += std::to_string(pair.value_bool());
                 break;
index 594561d..27d0d12 100644 (file)
@@ -17,6 +17,8 @@
 #pragma once
 
 #include "frameworks/base/cmds/statsd/src/stats_log.pb.h"
+#include <sstream>
+#include "logd/LogReader.h"
 
 #include <unordered_map>
 
@@ -32,6 +34,41 @@ typedef std::map<std::string, HashableDimensionKey> ConditionKey;
 
 typedef std::unordered_map<HashableDimensionKey, int64_t> DimToValMap;
 
+/*
+ * In memory rep for LogEvent. Uses much less memory than LogEvent
+ */
+typedef struct EventKV {
+    std::vector<KeyValuePair> kv;
+    string ToString() const {
+        std::ostringstream result;
+        result << "{ ";
+        const size_t N = kv.size();
+        for (size_t i = 0; i < N; i++) {
+            result << " ";
+            result << (i + 1);
+            result << "->";
+            const auto& pair = kv[i];
+            if (pair.has_value_int()) {
+                result << pair.value_int();
+            } else if (pair.has_value_long()) {
+                result << pair.value_long();
+            } else if (pair.has_value_float()) {
+                result << pair.value_float();
+            } else if (pair.has_value_str()) {
+                result << pair.value_str().c_str();
+            }
+        }
+        result << " }";
+        return result.str();
+    }
+} EventKV;
+
+typedef std::unordered_map<HashableDimensionKey, std::shared_ptr<EventKV>> DimToEventKVMap;
+
+EventMetricData parse(log_msg msg);
+
+int getTagId(log_msg msg);
+
 std::string getHashableKey(std::vector<KeyValuePair> key);
 
 }  // namespace statsd
index c9654af..a30b5f8 100644 (file)
@@ -120,6 +120,11 @@ message MetricConditionLink {
     repeated KeyMatcher key_in_condition = 3;
 }
 
+message FieldFilter {
+    optional bool include_all = 1;
+    repeated int32 field_num = 2;
+}
+
 message EventMetric {
     optional string name = 1;
 
@@ -170,7 +175,7 @@ message GaugeMetric {
 
     optional string what = 2;
 
-    optional int32 gauge_field = 3;
+    optional FieldFilter gauge_fields = 3;
 
     optional string condition = 4;
 
index 59475d2..68b7dcb 100644 (file)
@@ -26,6 +26,7 @@ using android::sp;
 using std::set;
 using std::unordered_map;
 using std::vector;
+using std::make_shared;
 
 #ifdef __ANDROID__
 
@@ -34,120 +35,142 @@ namespace os {
 namespace statsd {
 
 const ConfigKey kConfigKey(0, "test");
+const int tagId = 1;
+const string metricName = "test_metric";
+const int64_t bucketStartTimeNs = 10000000000;
+const int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
+const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
+const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
+const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
 
-TEST(GaugeMetricProducerTest, TestWithCondition) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
-
+TEST(GaugeMetricProducerTest, TestNoCondition) {
     GaugeMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
-    metric.set_gauge_field(2);
+    metric.mutable_gauge_fields()->add_field_num(2);
 
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
 
-    GaugeMetricProducer gaugeProducer(metric, 1 /*has condition*/, wizard, -1, bucketStartTimeNs);
+    // TODO: pending refactor of StatsPullerManager
+    // For now we still need this so that it doesn't do real pulling.
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
 
-    vector<std::shared_ptr<LogEvent>> allData;
-    std::shared_ptr<LogEvent> event1 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 1);
-    event1->write(1);
-    event1->write(13);
-    event1->init();
-    allData.push_back(event1);
+    GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      tagId, tagId, bucketStartTimeNs, pullerManager);
 
-    std::shared_ptr<LogEvent> event2 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 10);
-    event2->write(1);
-    event2->write(15);
-    event2->init();
-    allData.push_back(event2);
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(tagId);
+    event->write(11);
+    event->init();
+    allData.push_back(event);
 
     gaugeProducer.onDataPulled(allData);
-    gaugeProducer.flushIfNeededLocked(event2->GetTimestampNs() + 1);
-    EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
+    EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
+    EXPECT_EQ(11, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size());
 
-    gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 11);
-    gaugeProducer.onConditionChanged(false, bucketStartTimeNs + 21);
-    gaugeProducer.onConditionChanged(true, bucketStartTimeNs + bucketSizeNs + 11);
-    std::shared_ptr<LogEvent> event3 =
-            std::make_shared<LogEvent>(1, bucketStartTimeNs + 2 * bucketSizeNs + 10);
-    event3->write(1);
-    event3->write(25);
-    event3->init();
-    allData.push_back(event3);
+    allData.clear();
+    std::shared_ptr<LogEvent> event2 =
+            std::make_shared<LogEvent>(tagId, bucket3StartTimeNs + 10);
+    event2->write(tagId);
+    event2->write(25);
+    event2->init();
+    allData.push_back(event2);
     gaugeProducer.onDataPulled(allData);
-    gaugeProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 10);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
-    EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
+    EXPECT_EQ(25, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     // One dimension.
     EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge);
-    EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum);
-    EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs,
-              gaugeProducer.mPastBuckets.begin()->second.front().mBucketStartNs);
-}
+    EXPECT_EQ(11L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
+    EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
 
-TEST(GaugeMetricProducerTest, TestNoCondition) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
+    gaugeProducer.flushIfNeededLocked(bucket4StartTimeNs);
+    EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
+    // One dimension.
+    EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
+    EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
+    EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
+}
 
+TEST(GaugeMetricProducerTest, TestWithCondition) {
     GaugeMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
-    metric.set_gauge_field(2);
+    metric.mutable_gauge_fields()->add_field_num(2);
+    metric.set_condition("SCREEN_ON");
 
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
 
-    GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs);
-
-    vector<std::shared_ptr<LogEvent>> allData;
-    std::shared_ptr<LogEvent> event1 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 1);
-    event1->write(1);
-    event1->write(13);
-    event1->init();
-    allData.push_back(event1);
-
-    std::shared_ptr<LogEvent> event2 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 10);
-    event2->write(1);
-    event2->write(15);
-    event2->init();
-    allData.push_back(event2);
-
-    std::shared_ptr<LogEvent> event3 =
-            std::make_shared<LogEvent>(1, bucketStartTimeNs + 2 * bucketSizeNs + 10);
-    event3->write(1);
-    event3->write(25);
-    event3->init();
-    allData.push_back(event3);
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    GaugeMetricProducer gaugeProducer(kConfigKey, metric, 1, wizard, tagId, tagId,
+                                      bucketStartTimeNs, pullerManager);
+
+    gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+    EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
+    EXPECT_EQ(100, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
+    EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size());
 
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
     gaugeProducer.onDataPulled(allData);
-    // Has one slice
+
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
-    EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
+    EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
+    EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
+    EXPECT_EQ(100, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
+
+    gaugeProducer.onConditionChanged(false, bucket2StartTimeNs + 10);
+    gaugeProducer.flushIfNeededLocked(bucket3StartTimeNs + 10);
+    EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
     EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(13L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge);
-    EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum);
-    EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mGauge);
-    EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
-    EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs,
-              gaugeProducer.mPastBuckets.begin()->second.back().mBucketStartNs);
+    EXPECT_EQ(110L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
+    EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
 }
 
 TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
-    int64_t bucketStartTimeNs = 10000000000;
-    int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
 
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+
     GaugeMetric metric;
-    metric.set_name("1");
+    metric.set_name(metricName);
     metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
-    metric.set_gauge_field(2);
-    GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs);
+    metric.mutable_gauge_fields()->add_field_num(2);
+    GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      tagId, tagId, bucketStartTimeNs, pullerManager);
 
     Alert alert;
     alert.set_name("alert");
-    alert.set_metric_name("1");
+    alert.set_metric_name(metricName);
     alert.set_trigger_if_sum_gt(25);
     alert.set_number_of_buckets(2);
     sp<AnomalyTracker> anomalyTracker = new AnomalyTracker(alert, kConfigKey);
@@ -160,7 +183,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
 
     gaugeProducer.onDataPulled({event1});
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
-    EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
+    EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
 
     std::shared_ptr<LogEvent> event2 =
@@ -171,7 +194,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
 
     gaugeProducer.onDataPulled({event2});
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
-    EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
+    EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event2->GetTimestampNs());
 
     std::shared_ptr<LogEvent> event3 =
@@ -182,7 +205,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
 
     gaugeProducer.onDataPulled({event3});
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
-    EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
+    EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs());
 
     // The event4 does not have the gauge field. Thus the current bucket value is 0.
@@ -191,7 +214,8 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
     event4->write(1);
     event4->init();
     gaugeProducer.onDataPulled({event4});
-    EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
+    EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
+    EXPECT_EQ(0, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
     EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs());
 }