From d9dfda76e0aa8b8aa4d2562a7ff4259f4500e355 Mon Sep 17 00:00:00 2001 From: Chenjie Yu Date: Mon, 11 Dec 2017 17:41:20 -0800 Subject: [PATCH] add feature: GaugeMetricProducer now takes repeated list of fields bug fix: GaugeMetricProducer now works better with pulled events. unit test also includes GaugeMetricProducer_test Test: unit test Change-Id: Ic60f09342d14cfb107be2130d445b323a56909e0 --- cmds/statsd/Android.mk | 1 + cmds/statsd/src/config/ConfigManager.cpp | 2 +- cmds/statsd/src/logd/LogEvent.cpp | 9 +- cmds/statsd/src/logd/LogEvent.h | 4 + cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 146 +++++++++++----- cmds/statsd/src/metrics/GaugeMetricProducer.h | 36 ++-- cmds/statsd/src/metrics/metrics_manager_util.cpp | 40 ++--- cmds/statsd/src/stats_log.proto | 9 +- cmds/statsd/src/stats_util.cpp | 3 + cmds/statsd/src/stats_util.h | 37 +++++ cmds/statsd/src/statsd_config.proto | 7 +- .../tests/metrics/GaugeMetricProducer_test.cpp | 184 ++++++++++++--------- 12 files changed, 321 insertions(+), 157 deletions(-) diff --git a/cmds/statsd/Android.mk b/cmds/statsd/Android.mk index addba8c3a085..3e517bb0a029 100644 --- a/cmds/statsd/Android.mk +++ b/cmds/statsd/Android.mk @@ -173,6 +173,7 @@ LOCAL_SRC_FILES := \ tests/metrics/DurationMetricProducer_test.cpp \ tests/metrics/EventMetricProducer_test.cpp \ tests/metrics/ValueMetricProducer_test.cpp \ + tests/metrics/GaugeMetricProducer_test.cpp \ tests/guardrail/StatsdStats_test.cpp LOCAL_STATIC_LIBRARIES := \ diff --git a/cmds/statsd/src/config/ConfigManager.cpp b/cmds/statsd/src/config/ConfigManager.cpp index 164f88f3df59..a28da5dc433f 100644 --- a/cmds/statsd/src/config/ConfigManager.cpp +++ b/cmds/statsd/src/config/ConfigManager.cpp @@ -369,7 +369,7 @@ StatsdConfig build_fake_config() { GaugeMetric* gaugeMetric = config.add_gauge_metric(); gaugeMetric->set_name("METRIC_10"); gaugeMetric->set_what("DEVICE_TEMPERATURE"); - gaugeMetric->set_gauge_field(DEVICE_TEMPERATURE_KEY); + gaugeMetric->mutable_gauge_fields()->add_field_num(DEVICE_TEMPERATURE_KEY); gaugeMetric->mutable_bucket()->set_bucket_size_millis(60 * 1000L); // Event matchers............ diff --git a/cmds/statsd/src/logd/LogEvent.cpp b/cmds/statsd/src/logd/LogEvent.cpp index 103213830914..01487f040dbb 100644 --- a/cmds/statsd/src/logd/LogEvent.cpp +++ b/cmds/statsd/src/logd/LogEvent.cpp @@ -69,6 +69,13 @@ bool LogEvent::write(uint32_t value) { return false; } +bool LogEvent::write(int64_t value) { + if (mContext) { + return android_log_write_int64(mContext, value) >= 0; + } + return false; +} + bool LogEvent::write(uint64_t value) { if (mContext) { return android_log_write_int64(mContext, value) >= 0; @@ -224,7 +231,7 @@ KeyValuePair LogEvent::GetKeyValueProto(size_t key) const { if (elem.type == EVENT_TYPE_INT) { pair.set_value_int(elem.data.int32); } else if (elem.type == EVENT_TYPE_LONG) { - pair.set_value_int(elem.data.int64); + pair.set_value_long(elem.data.int64); } else if (elem.type == EVENT_TYPE_STRING) { pair.set_value_str(elem.data.string); } else if (elem.type == EVENT_TYPE_FLOAT) { diff --git a/cmds/statsd/src/logd/LogEvent.h b/cmds/statsd/src/logd/LogEvent.h index 176e16e3113a..6ff6b87beca6 100644 --- a/cmds/statsd/src/logd/LogEvent.h +++ b/cmds/statsd/src/logd/LogEvent.h @@ -110,6 +110,10 @@ public: */ void setTimestampNs(uint64_t timestampNs) {mTimestampNs = timestampNs;} + int size() const { + return mElements.size(); + } + private: /** * Don't copy, it's slower. If we really need this we can add it but let's try to diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index ae9b86fc3c9d..7ec57dcaee88 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -19,11 +19,8 @@ #include "GaugeMetricProducer.h" #include "guardrail/StatsdStats.h" -#include "stats_util.h" #include -#include -#include using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; @@ -37,6 +34,8 @@ using std::map; using std::string; using std::unordered_map; using std::vector; +using std::make_shared; +using std::shared_ptr; namespace android { namespace os { @@ -61,21 +60,27 @@ const int FIELD_ID_VALUE_FLOAT = 5; // for GaugeBucketInfo const int FIELD_ID_START_BUCKET_NANOS = 1; const int FIELD_ID_END_BUCKET_NANOS = 2; -const int FIELD_ID_GAUGE = 3; +const int FIELD_ID_ATOM = 3; GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex, - const sp& wizard, const int pullTagId, - const int64_t startTimeNs) + const sp& wizard, const int atomTagId, + const int pullTagId, const uint64_t startTimeNs, + shared_ptr statsPullerManager) : MetricProducer(metric.name(), key, startTimeNs, conditionIndex, wizard), - mGaugeField(metric.gauge_field()), - mPullTagId(pullTagId) { + mStatsPullerManager(statsPullerManager), + mPullTagId(pullTagId), + mAtomTagId(atomTagId) { if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) { mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000; } else { mBucketSizeNs = kDefaultGaugemBucketSizeNs; } + for (int i = 0; i < metric.gauge_fields().field_num_size(); i++) { + mGaugeFields.push_back(metric.gauge_fields().field_num(i)); + } + // TODO: use UidMap if uid->pkg_name is required mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end()); @@ -87,7 +92,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric // Kicks off the puller immediately. if (mPullTagId != -1) { - mStatsPullerManager.RegisterReceiver(mPullTagId, this, + mStatsPullerManager->RegisterReceiver(mPullTagId, this, metric.bucket().bucket_size_millis()); } @@ -95,10 +100,19 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric (long long)mBucketSizeNs, (long long)mStartTimeNs); } +// for testing +GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, + const int conditionIndex, + const sp& wizard, const int pullTagId, + const int atomTagId, const int64_t startTimeNs) + : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs, + make_shared()) { +} + GaugeMetricProducer::~GaugeMetricProducer() { VLOG("~GaugeMetricProducer() called"); if (mPullTagId != -1) { - mStatsPullerManager.UnRegisterReceiver(mPullTagId, this); + mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); } } @@ -149,10 +163,26 @@ void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, (long long)bucket.mBucketStartNs); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, (long long)bucket.mBucketEndNs); - protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge); + long long atomToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM); + long long eventToken = protoOutput->start(FIELD_TYPE_MESSAGE | mAtomTagId); + for (const auto& pair : bucket.mEvent->kv) { + if (pair.has_value_int()) { + protoOutput->write(FIELD_TYPE_INT32 | pair.key(), pair.value_int()); + } else if (pair.has_value_long()) { + protoOutput->write(FIELD_TYPE_INT64 | pair.key(), pair.value_long()); + } else if (pair.has_value_str()) { + protoOutput->write(FIELD_TYPE_STRING | pair.key(), pair.value_str()); + } else if (pair.has_value_long()) { + protoOutput->write(FIELD_TYPE_FLOAT | pair.key(), pair.value_float()); + } else if (pair.has_value_bool()) { + protoOutput->write(FIELD_TYPE_BOOL | pair.key(), pair.value_bool()); + } + } + protoOutput->end(eventToken); + protoOutput->end(atomToken); protoOutput->end(bucketInfoToken); - VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, - (long long)bucket.mBucketEndNs, (long long)bucket.mGauge); + VLOG("\t bucket [%lld - %lld] content: %s", (long long)bucket.mBucketStartNs, + (long long)bucket.mBucketEndNs, bucket.mEvent->ToString().c_str()); } protoOutput->end(wrapperToken); } @@ -174,6 +204,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, if (mPullTagId == -1) { return; } + // No need to pull again. Either scheduled pull or condition on true happened if (!mCondition) { return; } @@ -182,7 +213,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, return; } vector> allData; - if (!mStatsPullerManager.Pull(mPullTagId, &allData)) { + if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { ALOGE("Stats puller failed for tag: %d", mPullTagId); return; } @@ -196,20 +227,25 @@ void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventT VLOG("Metric %s onSlicedConditionMayChange", mName.c_str()); } -int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { - status_t err = NO_ERROR; - int64_t val = event.GetLong(mGaugeField, &err); - if (err == NO_ERROR) { - return val; +shared_ptr GaugeMetricProducer::getGauge(const LogEvent& event) { + shared_ptr ret = make_shared(); + if (mGaugeFields.size() == 0) { + for (int i = 1; i <= event.size(); i++) { + ret->kv.push_back(event.GetKeyValueProto(i)); + } } else { - VLOG("Can't find value in message."); - return -1; + for (int i = 0; i < (int)mGaugeFields.size(); i++) { + ret->kv.push_back(event.GetKeyValueProto(mGaugeFields[i])); + } } + return ret; } void GaugeMetricProducer::onDataPulled(const std::vector>& allData) { std::lock_guard lock(mMutex); - + if (allData.size() == 0) { + return; + } for (const auto& data : allData) { onMatchedLogEventLocked(0, *data); } @@ -247,25 +283,48 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked( (long long)mCurrentBucketStartTimeNs); return; } - - // When the event happens in a new bucket, flush the old buckets. - if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { - flushIfNeededLocked(eventTimeNs); - } + flushIfNeededLocked(eventTimeNs); // For gauge metric, we just simply use the first gauge in the given bucket. - if (!mCurrentSlicedBucket->empty()) { + if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end()) { return; } - const long gauge = getGauge(event); - if (gauge >= 0) { - if (hitGuardRailLocked(eventKey)) { - return; + shared_ptr gauge = getGauge(event); + if (hitGuardRailLocked(eventKey)) { + return; + } + (*mCurrentSlicedBucket)[eventKey] = gauge; + // Anomaly detection on gauge metric only works when there is one numeric + // field specified. + if (mAnomalyTrackers.size() > 0) { + if (gauge->kv.size() == 1) { + KeyValuePair pair = gauge->kv[0]; + long gaugeVal = 0; + if (pair.has_value_int()) { + gaugeVal = (long)pair.value_int(); + } else if (pair.has_value_long()) { + gaugeVal = pair.value_long(); + } + for (auto& tracker : mAnomalyTrackers) { + tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, + gaugeVal); + } } - (*mCurrentSlicedBucket)[eventKey] = gauge; } - for (auto& tracker : mAnomalyTrackers) { - tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge); +} + +void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { + mCurrentSlicedBucketForAnomaly->clear(); + status_t err = NO_ERROR; + for (const auto& slice : *mCurrentSlicedBucket) { + KeyValuePair pair = slice.second->kv[0]; + long gaugeVal = 0; + if (pair.has_value_int()) { + gaugeVal = (long)pair.value_int(); + } else if (pair.has_value_long()) { + gaugeVal = pair.value_long(); + } + (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; } } @@ -276,6 +335,8 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked( // the GaugeMetricProducer while holding the lock. void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { + VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, + (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); return; } @@ -285,19 +346,22 @@ void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { info.mBucketNum = mCurrentBucketNum; for (const auto& slice : *mCurrentSlicedBucket) { - info.mGauge = slice.second; + info.mEvent = slice.second; auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); - VLOG("gauge metric %s, dump key value: %s -> %lld", mName.c_str(), slice.first.c_str(), - (long long)slice.second); + VLOG("gauge metric %s, dump key value: %s -> %s", mName.c_str(), + slice.first.c_str(), slice.second->ToString().c_str()); } // Reset counters - for (auto& tracker : mAnomalyTrackers) { - tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum); + if (mAnomalyTrackers.size() > 0) { + updateCurrentSlicedBucketForAnomaly(); + for (auto& tracker : mAnomalyTrackers) { + tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); + } } - mCurrentSlicedBucket = std::make_shared(); + mCurrentSlicedBucket = std::make_shared(); // Adjusts the bucket start time int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index 6e6f2bbdf945..4a037ffb1c32 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -26,7 +26,7 @@ #include "../matchers/matcher_util.h" #include "MetricProducer.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" -#include "stats_util.h" +#include "../stats_util.h" namespace android { namespace os { @@ -35,7 +35,7 @@ namespace statsd { struct GaugeBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; - int64_t mGauge; + std::shared_ptr mEvent; uint64_t mBucketNum; }; @@ -49,7 +49,7 @@ public: // for all metrics. GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& countMetric, const int conditionIndex, const sp& wizard, - const int pullTagId, const int64_t startTimeNs); + const int pullTagId, const int atomTagId, const int64_t startTimeNs); virtual ~GaugeMetricProducer(); @@ -72,6 +72,12 @@ private: void onDumpReportLocked(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) override; + // for testing + GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& gaugeMetric, + const int conditionIndex, const sp& wizard, + const int pullTagId, const int atomTagId, const uint64_t startTimeNs, + std::shared_ptr statsPullerManager); + // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; @@ -84,12 +90,10 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& eventTime); - // The default bucket size for gauge metric is 1 second. - static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000; + // The default bucket size for gauge metric is 1 hr. + static const uint64_t kDefaultGaugemBucketSizeNs = 60ULL * 60 * 1000 * 1000 * 1000; - const int32_t mGaugeField; - - StatsPullerManager mStatsPullerManager; + std::shared_ptr mStatsPullerManager; // tagId for pulled data. -1 if this is not pulled const int mPullTagId; @@ -98,9 +102,21 @@ private: std::unordered_map> mPastBuckets; // The current bucket. - std::shared_ptr mCurrentSlicedBucket = std::make_shared(); + std::shared_ptr mCurrentSlicedBucket = std::make_shared(); + + // The current bucket for anomaly detection. + std::shared_ptr mCurrentSlicedBucketForAnomaly = std::make_shared(); + + // Translate Atom based bucket to single numeric value bucket for anomaly + void updateCurrentSlicedBucketForAnomaly(); + + int mAtomTagId; + + // Whitelist of fields to report. Empty means all are reported. + std::vector mGaugeFields; - int64_t getGauge(const LogEvent& event); + // apply a whitelist on the original input + std::shared_ptr getGauge(const LogEvent& event); // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const HashableDimensionKey& newKey); diff --git a/cmds/statsd/src/metrics/metrics_manager_util.cpp b/cmds/statsd/src/metrics/metrics_manager_util.cpp index 943becb6fb3c..e20fb52c8e21 100644 --- a/cmds/statsd/src/metrics/metrics_manager_util.cpp +++ b/cmds/statsd/src/metrics/metrics_manager_util.cpp @@ -370,15 +370,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config, sp atomMatcher = allAtomMatchers.at(trackerIndex); // If it is pulled atom, it should be simple matcher with one tagId. - int pullTagId = -1; - for (int tagId : atomMatcher->getTagIds()) { - if (statsPullerManager.PullerForMatcherExists(tagId)) { - if (atomMatcher->getTagIds().size() != 1) { - return false; - } - pullTagId = tagId; - } + if (atomMatcher->getTagIds().size() != 1) { + return false; } + int atomTagId = *(atomMatcher->getTagIds().begin()); + int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1; int conditionIndex = -1; if (metric.has_condition()) { @@ -404,7 +400,17 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config, for (int i = 0; i < config.gauge_metric_size(); i++) { const GaugeMetric& metric = config.gauge_metric(i); if (!metric.has_what()) { - ALOGW("cannot find \"what\" in ValueMetric \"%s\"", metric.name().c_str()); + ALOGW("cannot find \"what\" in GaugeMetric \"%s\"", metric.name().c_str()); + return false; + } + + if (((!metric.gauge_fields().has_include_all() || + (metric.gauge_fields().has_include_all() && + metric.gauge_fields().include_all() == false)) && + metric.gauge_fields().field_num_size() == 0) || + (metric.gauge_fields().has_include_all() && metric.gauge_fields().include_all() == true && + metric.gauge_fields().field_num_size() > 0)) { + ALOGW("Incorrect field filter setting in GaugeMetric %s", metric.name().c_str()); return false; } @@ -419,15 +425,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config, sp atomMatcher = allAtomMatchers.at(trackerIndex); // If it is pulled atom, it should be simple matcher with one tagId. - int pullTagId = -1; - for (int tagId : atomMatcher->getTagIds()) { - if (statsPullerManager.PullerForMatcherExists(tagId)) { - if (atomMatcher->getTagIds().size() != 1) { - return false; - } - pullTagId = tagId; - } + if (atomMatcher->getTagIds().size() != 1) { + return false; } + int atomTagId = *(atomMatcher->getTagIds().begin()); + int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1; int conditionIndex = -1; if (metric.has_condition()) { @@ -444,8 +446,8 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config, } } - sp gaugeProducer = new GaugeMetricProducer(key, metric, conditionIndex, - wizard, pullTagId, startTimeNs); + sp gaugeProducer = new GaugeMetricProducer( + key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs); allMetricProducers.push_back(gaugeProducer); } return true; diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index 20d9d5c15f1b..3c85c57ab037 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -29,9 +29,10 @@ message KeyValuePair { oneof value { string value_str = 2; - int64 value_int = 3; - bool value_bool = 4; - float value_float = 5; + int32 value_int = 3; + int64 value_long = 4; + bool value_bool = 5; + float value_float = 6; } } @@ -88,7 +89,7 @@ message GaugeBucketInfo { optional int64 end_bucket_nanos = 2; - optional int64 gauge = 3; + optional Atom atom = 3; } message GaugeMetricData { diff --git a/cmds/statsd/src/stats_util.cpp b/cmds/statsd/src/stats_util.cpp index bfa3254d5519..7527a6455b05 100644 --- a/cmds/statsd/src/stats_util.cpp +++ b/cmds/statsd/src/stats_util.cpp @@ -35,6 +35,9 @@ HashableDimensionKey getHashableKey(std::vector keys) { case KeyValuePair::ValueCase::kValueInt: flattened += std::to_string(pair.value_int()); break; + case KeyValuePair::ValueCase::kValueLong: + flattened += std::to_string(pair.value_long()); + break; case KeyValuePair::ValueCase::kValueBool: flattened += std::to_string(pair.value_bool()); break; diff --git a/cmds/statsd/src/stats_util.h b/cmds/statsd/src/stats_util.h index 594561d3d294..27d0d12ec5be 100644 --- a/cmds/statsd/src/stats_util.h +++ b/cmds/statsd/src/stats_util.h @@ -17,6 +17,8 @@ #pragma once #include "frameworks/base/cmds/statsd/src/stats_log.pb.h" +#include +#include "logd/LogReader.h" #include @@ -32,6 +34,41 @@ typedef std::map ConditionKey; typedef std::unordered_map DimToValMap; +/* + * In memory rep for LogEvent. Uses much less memory than LogEvent + */ +typedef struct EventKV { + std::vector kv; + string ToString() const { + std::ostringstream result; + result << "{ "; + const size_t N = kv.size(); + for (size_t i = 0; i < N; i++) { + result << " "; + result << (i + 1); + result << "->"; + const auto& pair = kv[i]; + if (pair.has_value_int()) { + result << pair.value_int(); + } else if (pair.has_value_long()) { + result << pair.value_long(); + } else if (pair.has_value_float()) { + result << pair.value_float(); + } else if (pair.has_value_str()) { + result << pair.value_str().c_str(); + } + } + result << " }"; + return result.str(); + } +} EventKV; + +typedef std::unordered_map> DimToEventKVMap; + +EventMetricData parse(log_msg msg); + +int getTagId(log_msg msg); + std::string getHashableKey(std::vector key); } // namespace statsd diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto index c9654af13137..a30b5f8c06e6 100644 --- a/cmds/statsd/src/statsd_config.proto +++ b/cmds/statsd/src/statsd_config.proto @@ -120,6 +120,11 @@ message MetricConditionLink { repeated KeyMatcher key_in_condition = 3; } +message FieldFilter { + optional bool include_all = 1; + repeated int32 field_num = 2; +} + message EventMetric { optional string name = 1; @@ -170,7 +175,7 @@ message GaugeMetric { optional string what = 2; - optional int32 gauge_field = 3; + optional FieldFilter gauge_fields = 3; optional string condition = 4; diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp index 59475d266217..68b7dcb64100 100644 --- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp @@ -26,6 +26,7 @@ using android::sp; using std::set; using std::unordered_map; using std::vector; +using std::make_shared; #ifdef __ANDROID__ @@ -34,120 +35,142 @@ namespace os { namespace statsd { const ConfigKey kConfigKey(0, "test"); +const int tagId = 1; +const string metricName = "test_metric"; +const int64_t bucketStartTimeNs = 10000000000; +const int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL; +const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs; +const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs; +const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs; -TEST(GaugeMetricProducerTest, TestWithCondition) { - int64_t bucketStartTimeNs = 10000000000; - int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL; - +TEST(GaugeMetricProducerTest, TestNoCondition) { GaugeMetric metric; - metric.set_name("1"); + metric.set_name(metricName); metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000); - metric.set_gauge_field(2); + metric.mutable_gauge_fields()->add_field_num(2); sp wizard = new NaggyMock(); - GaugeMetricProducer gaugeProducer(metric, 1 /*has condition*/, wizard, -1, bucketStartTimeNs); + // TODO: pending refactor of StatsPullerManager + // For now we still need this so that it doesn't do real pulling. + shared_ptr pullerManager = + make_shared>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); - vector> allData; - std::shared_ptr event1 = std::make_shared(1, bucketStartTimeNs + 1); - event1->write(1); - event1->write(13); - event1->init(); - allData.push_back(event1); + GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + tagId, tagId, bucketStartTimeNs, pullerManager); - std::shared_ptr event2 = std::make_shared(1, bucketStartTimeNs + 10); - event2->write(1); - event2->write(15); - event2->init(); - allData.push_back(event2); + vector> allData; + allData.clear(); + shared_ptr event = make_shared(tagId, bucket2StartTimeNs + 1); + event->write(tagId); + event->write(11); + event->init(); + allData.push_back(event); gaugeProducer.onDataPulled(allData); - gaugeProducer.flushIfNeededLocked(event2->GetTimestampNs() + 1); - EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size()); + EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); + EXPECT_EQ(11, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size()); - gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 11); - gaugeProducer.onConditionChanged(false, bucketStartTimeNs + 21); - gaugeProducer.onConditionChanged(true, bucketStartTimeNs + bucketSizeNs + 11); - std::shared_ptr event3 = - std::make_shared(1, bucketStartTimeNs + 2 * bucketSizeNs + 10); - event3->write(1); - event3->write(25); - event3->init(); - allData.push_back(event3); + allData.clear(); + std::shared_ptr event2 = + std::make_shared(tagId, bucket3StartTimeNs + 10); + event2->write(tagId); + event2->write(25); + event2->init(); + allData.push_back(event2); gaugeProducer.onDataPulled(allData); - gaugeProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 10); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); - EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second); + EXPECT_EQ(25, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); // One dimension. EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge); - EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum); - EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, - gaugeProducer.mPastBuckets.begin()->second.front().mBucketStartNs); -} + EXPECT_EQ(11L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int()); + EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum); -TEST(GaugeMetricProducerTest, TestNoCondition) { - int64_t bucketStartTimeNs = 10000000000; - int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL; + gaugeProducer.flushIfNeededLocked(bucket4StartTimeNs); + EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size()); + // One dimension. + EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); + EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size()); + EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int()); + EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum); +} +TEST(GaugeMetricProducerTest, TestWithCondition) { GaugeMetric metric; - metric.set_name("1"); + metric.set_name(metricName); metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000); - metric.set_gauge_field(2); + metric.mutable_gauge_fields()->add_field_num(2); + metric.set_condition("SCREEN_ON"); sp wizard = new NaggyMock(); - GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs); - - vector> allData; - std::shared_ptr event1 = std::make_shared(1, bucketStartTimeNs + 1); - event1->write(1); - event1->write(13); - event1->init(); - allData.push_back(event1); - - std::shared_ptr event2 = std::make_shared(1, bucketStartTimeNs + 10); - event2->write(1); - event2->write(15); - event2->init(); - allData.push_back(event2); - - std::shared_ptr event3 = - std::make_shared(1, bucketStartTimeNs + 2 * bucketSizeNs + 10); - event3->write(1); - event3->write(25); - event3->init(); - allData.push_back(event3); + shared_ptr pullerManager = + make_shared>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + .WillOnce(Invoke([](int tagId, vector>* data) { + data->clear(); + shared_ptr event = make_shared(tagId, bucketStartTimeNs + 10); + event->write(tagId); + event->write(100); + event->init(); + data->push_back(event); + return true; + })); + + GaugeMetricProducer gaugeProducer(kConfigKey, metric, 1, wizard, tagId, tagId, + bucketStartTimeNs, pullerManager); + + gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 8); + EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); + EXPECT_EQ(100, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); + EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size()); + vector> allData; + allData.clear(); + shared_ptr event = make_shared(tagId, bucket2StartTimeNs + 1); + event->write(1); + event->write(110); + event->init(); + allData.push_back(event); gaugeProducer.onDataPulled(allData); - // Has one slice + EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); - EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second); + EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); + EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); + EXPECT_EQ(100, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int()); + + gaugeProducer.onConditionChanged(false, bucket2StartTimeNs + 10); + gaugeProducer.flushIfNeededLocked(bucket3StartTimeNs + 10); + EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(13L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge); - EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum); - EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mGauge); - EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum); - EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, - gaugeProducer.mPastBuckets.begin()->second.back().mBucketStartNs); + EXPECT_EQ(110L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int()); + EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum); } TEST(GaugeMetricProducerTest, TestAnomalyDetection) { - int64_t bucketStartTimeNs = 10000000000; - int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL; sp wizard = new NaggyMock(); + shared_ptr pullerManager = + make_shared>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); + GaugeMetric metric; - metric.set_name("1"); + metric.set_name(metricName); metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000); - metric.set_gauge_field(2); - GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs); + metric.mutable_gauge_fields()->add_field_num(2); + GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + tagId, tagId, bucketStartTimeNs, pullerManager); Alert alert; alert.set_name("alert"); - alert.set_metric_name("1"); + alert.set_metric_name(metricName); alert.set_trigger_if_sum_gt(25); alert.set_number_of_buckets(2); sp anomalyTracker = new AnomalyTracker(alert, kConfigKey); @@ -160,7 +183,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) { gaugeProducer.onDataPulled({event1}); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); - EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second); + EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL); std::shared_ptr event2 = @@ -171,7 +194,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) { gaugeProducer.onDataPulled({event2}); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); - EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second); + EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event2->GetTimestampNs()); std::shared_ptr event3 = @@ -182,7 +205,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) { gaugeProducer.onDataPulled({event3}); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); - EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second); + EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs()); // The event4 does not have the gauge field. Thus the current bucket value is 0. @@ -191,7 +214,8 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) { event4->write(1); event4->init(); gaugeProducer.onDataPulled({event4}); - EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size()); + EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); + EXPECT_EQ(0, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int()); EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs()); } -- 2.11.0