OSDN Git Service

1. Enable FIELD_TYPE IDs defined in ProtoOutputStream
authoryro <yro@google.com>
Wed, 1 Nov 2017 06:06:53 +0000 (23:06 -0700)
committeryro <yro@google.com>
Thu, 2 Nov 2017 18:13:18 +0000 (11:13 -0700)
2. Migrate CountMetricProducer to use ProtoOutputStream

Test: statsd, statsd_test
Change-Id: I33a1ea77a49b045818a48923b2263cb594ab0013

cmds/statsd/src/logd/LogEvent.cpp
cmds/statsd/src/metrics/CountMetricProducer.cpp
cmds/statsd/src/metrics/CountMetricProducer.h
cmds/statsd/src/metrics/EventMetricProducer.cpp
cmds/statsd/src/stats_util.h

index 451d26d..1a075d8 100644 (file)
@@ -24,6 +24,7 @@ namespace android {
 namespace os {
 namespace statsd {
 
+using namespace android::util;
 using std::ostringstream;
 using std::string;
 using android::util::ProtoOutputStream;
@@ -207,20 +208,20 @@ string LogEvent::ToString() const {
 }
 
 void LogEvent::ToProto(ProtoOutputStream& proto) const {
-    long long atomToken = proto.start(TYPE_MESSAGE + mTagId);
+    long long atomToken = proto.start(FIELD_TYPE_MESSAGE | mTagId);
     const size_t N = mElements.size();
     for (size_t i=0; i<N; i++) {
         const int key = i + 1;
 
         const android_log_list_element& elem = mElements[i];
         if (elem.type == EVENT_TYPE_INT) {
-            proto.write(TYPE_INT32 + key, elem.data.int32);
+            proto.write(FIELD_TYPE_INT32 | key, elem.data.int32);
         } else if (elem.type == EVENT_TYPE_LONG) {
-            proto.write(TYPE_INT64 + key, (long long)elem.data.int64);
+            proto.write(FIELD_TYPE_INT64 | key, (long long)elem.data.int64);
         } else if (elem.type == EVENT_TYPE_FLOAT) {
-            proto.write(TYPE_FLOAT + key, elem.data.float32);
+            proto.write(FIELD_TYPE_FLOAT | key, elem.data.float32);
         } else if (elem.type == EVENT_TYPE_STRING) {
-            proto.write(TYPE_STRING + key, elem.data.string);
+            proto.write(FIELD_TYPE_STRING | key, elem.data.string);
         }
     }
     proto.end(atomToken);
index 9f8558d..71cb771 100644 (file)
@@ -24,6 +24,8 @@
 #include <limits.h>
 #include <stdlib.h>
 
+using namespace android::util;
+using android::util::ProtoOutputStream;
 using std::map;
 using std::string;
 using std::unordered_map;
@@ -33,6 +35,27 @@ namespace android {
 namespace os {
 namespace statsd {
 
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_COUNT_METRICS = 5;
+// for CountMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for CountMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for CountBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_COUNT = 3;
+
 // TODO: add back AnomalyTracker.
 CountMetricProducer::CountMetricProducer(const CountMetric& metric, const int conditionIndex,
                                          const sp<ConditionWizard>& wizard)
@@ -66,6 +89,8 @@ CountMetricProducer::CountMetricProducer(const CountMetric& metric, const int co
         mConditionSliced = true;
     }
 
+    startNewProtoOutputStream(mStartTimeNs);
+
     VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -74,23 +99,14 @@ CountMetricProducer::~CountMetricProducer() {
     VLOG("~CountMetricProducer() called");
 }
 
-void CountMetricProducer::finish() {
-    // TODO: write the StatsLogReport to dropbox using
-    // DropboxWriter.
+void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
+    mProto = std::make_unique<ProtoOutputStream>();
+    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
 }
 
-static void addSlicedCounterToReport(StatsLogReport_CountMetricDataWrapper& wrapper,
-                                     const vector<KeyValuePair>& key,
-                                     const vector<CountBucketInfo>& buckets) {
-    CountMetricData* data = wrapper.add_data();
-    for (const auto& kv : key) {
-        data->add_dimension()->CopyFrom(kv);
-    }
-    for (const auto& bucket : buckets) {
-        data->add_bucket_info()->CopyFrom(bucket);
-        VLOG("\t bucket [%lld - %lld] count: %lld", bucket.start_bucket_nanos(),
-             bucket.end_bucket_nanos(), bucket.count());
-    }
+void CountMetricProducer::finish() {
 }
 
 void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
@@ -98,33 +114,81 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
 }
 
 StatsLogReport CountMetricProducer::onDumpReport() {
-    VLOG("metric %lld dump report now...", mMetric.metric_id());
-
-    StatsLogReport report;
-    report.set_metric_id(mMetric.metric_id());
-    report.set_start_report_nanos(mStartTimeNs);
+    long long endTime = time(nullptr) * NANO_SECONDS_IN_A_SECOND;
 
     // Dump current bucket if it's stale.
     // If current bucket is still on-going, don't force dump current bucket.
     // In finish(), We can force dump current bucket.
-    flushCounterIfNeeded(time(nullptr) * NANO_SECONDS_IN_A_SECOND);
-    report.set_end_report_nanos(mCurrentBucketStartTimeNs);
-
-    StatsLogReport_CountMetricDataWrapper* wrapper = report.mutable_count_metrics();
+    flushCounterIfNeeded(endTime);
 
-    for (const auto& pair : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = pair.first;
+    for (const auto& counter : mPastBucketProtos) {
+        const HashableDimensionKey& hashableKey = counter.first;
         auto it = mDimensionKeyMap.find(hashableKey);
         if (it == mDimensionKeyMap.end()) {
             ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
             continue;
         }
+        long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
+
+        // First fill dimension (KeyValuePairs).
+        for (const auto& kv : it->second) {
+            long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            if (kv.has_value_str()) {
+                mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+            } else if (kv.has_value_int()) {
+                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+            } else if (kv.has_value_bool()) {
+                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+            } else if (kv.has_value_float()) {
+                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+            }
+            mProto->end(dimensionToken);
+        }
 
-        VLOG("  dimension key %s", hashableKey.c_str());
-        addSlicedCounterToReport(*wrapper, it->second, pair.second);
+        // Then fill bucket_info (CountBucketInfo).
+        for (const auto& proto : counter.second) {
+            size_t bufferSize = proto->size();
+            char* buffer(new char[bufferSize]);
+            size_t pos = 0;
+            auto it = proto->data();
+            while (it.readBuffer() != NULL) {
+                size_t toRead = it.currentToRead();
+                std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+                pos += toRead;
+                it.rp()->move(toRead);
+            }
+            mProto->write(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION, buffer, bufferSize);
+        }
+
+        mProto->end(wrapperToken);
+    }
+
+    mProto->end(mProtoToken);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+                  (long long)mCurrentBucketStartTimeNs);
+
+    size_t bufferSize = mProto->size();
+    VLOG("metric %lld dump report now...", mMetric.metric_id());
+    std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
+    size_t pos = 0;
+    auto it = mProto->data();
+    while (it.readBuffer() != NULL) {
+        size_t toRead = it.currentToRead();
+        std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+        pos += toRead;
+        it.rp()->move(toRead);
     }
-    return report;
-    // TODO: Clear mPastBuckets, mDimensionKeyMap once the report is dumped.
+
+    startNewProtoOutputStream(endTime);
+    mPastBucketProtos.clear();
+    mByteSize = 0;
+
+    // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+    // return std::move(buffer);
+    return StatsLogReport();
+
+    // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
@@ -175,15 +239,17 @@ void CountMetricProducer::flushCounterIfNeeded(const uint64_t eventTimeNs) {
     // adjust the bucket start time
     int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
 
-    CountBucketInfo info;
-    info.set_start_bucket_nanos(mCurrentBucketStartTimeNs);
-    info.set_end_bucket_nanos(mCurrentBucketStartTimeNs + mBucketSizeNs);
-
     for (const auto& counter : mCurrentSlicedCounter) {
-        info.set_count(counter.second);
-        // it will auto create new vector of CountbucketInfo if the key is not found.
-        auto& bucketList = mPastBuckets[counter.first];
-        bucketList.push_back(info);
+        unique_ptr<ProtoOutputStream> proto = make_unique<ProtoOutputStream>();
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                     (long long)mCurrentBucketStartTimeNs);
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                      (long long)mCurrentBucketStartTimeNs + mBucketSizeNs);
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)counter.second);
+
+        auto& bucketList = mPastBucketProtos[counter.first];
+        bucketList.push_back(std::move(proto));
+        mByteSize += proto->size();
 
         VLOG("metric %lld, dump key value: %s -> %d", mMetric.metric_id(), counter.first.c_str(),
              counter.second);
@@ -202,11 +268,11 @@ void CountMetricProducer::flushCounterIfNeeded(const uint64_t eventTimeNs) {
          (long long)mCurrentBucketStartTimeNs);
 }
 
+// Rough estimate of CountMetricProducer buffer stored. This number will be
+// greater than actual data size as it contains each dimension of
+// CountMetricData is  duplicated.
 size_t CountMetricProducer::byteSize() {
-// TODO: return actual proto size when ProtoOutputStream is ready for use for
-// CountMetricsProducer.
-//    return mProto->size();
-    return 0;
+    return mByteSize;
 }
 
 }  // namespace statsd
index 80e80d9..473a4ba 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <unordered_map>
 
+#include <android/util/ProtoOutputStream.h>
 #include "../condition/ConditionTracker.h"
 #include "../matchers/matcher_util.h"
 #include "CountAnomalyTracker.h"
@@ -65,8 +66,10 @@ protected:
 private:
     const CountMetric mMetric;
 
-    // Save the past buckets and we can clear when the StatsLogReport is dumped.
-    std::unordered_map<HashableDimensionKey, std::vector<CountBucketInfo>> mPastBuckets;
+    std::unordered_map<HashableDimensionKey,
+        std::vector<unique_ptr<android::util::ProtoOutputStream>>> mPastBucketProtos;
+
+    size_t mByteSize;
 
     // The current bucket.
     std::unordered_map<HashableDimensionKey, int> mCurrentSlicedCounter;
@@ -74,6 +77,12 @@ private:
     vector<unique_ptr<CountAnomalyTracker>> mAnomalyTrackers;
 
     void flushCounterIfNeeded(const uint64_t newEventTime);
+
+    std::unique_ptr<android::util::ProtoOutputStream> mProto;
+
+    long long mProtoToken;
+
+    void startNewProtoOutputStream(long long timestamp);
 };
 
 }  // namespace statsd
index d714179..cbae1d3 100644 (file)
@@ -23,6 +23,7 @@
 #include <limits.h>
 #include <stdlib.h>
 
+using namespace android::util;
 using android::util::ProtoOutputStream;
 using std::map;
 using std::string;
@@ -36,13 +37,13 @@ namespace statsd {
 // for StatsLogReport
 const int FIELD_ID_METRIC_ID = 1;
 const int FIELD_ID_START_REPORT_NANOS = 2;
-const int FIELD_ID_END_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
 const int FIELD_ID_EVENT_METRICS = 4;
+// for EventMetricDataWrapper
+const int FIELD_ID_DATA = 1;
 // for EventMetricData
 const int FIELD_ID_TIMESTAMP_NANOS = 1;
 const int FIELD_ID_STATS_EVENTS = 2;
-// for CountMetricDataWrapper
-const int FIELD_ID_DATA = 1;
 
 EventMetricProducer::EventMetricProducer(const EventMetric& metric, const int conditionIndex,
                                          const sp<ConditionWizard>& wizard)
@@ -69,9 +70,9 @@ void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
     mProto = std::make_unique<ProtoOutputStream>();
     // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
     // and StatsEvent.
-    mProto->write(TYPE_INT32 + FIELD_ID_METRIC_ID, mMetric.metric_id());
-    mProto->write(TYPE_INT64 + FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(TYPE_MESSAGE + FIELD_ID_EVENT_METRICS);
+    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS);
 }
 
 void EventMetricProducer::finish() {
@@ -83,7 +84,7 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
 StatsLogReport EventMetricProducer::onDumpReport() {
     long long endTime = time(nullptr) * NANO_SECONDS_IN_A_SECOND;
     mProto->end(mProtoToken);
-    mProto->write(TYPE_INT64 + FIELD_ID_END_REPORT_NANOS, endTime);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
 
     size_t bufferSize = mProto->size();
     VLOG("metric %lld dump report now... proto size: %zu ", mMetric.metric_id(), bufferSize);
@@ -118,9 +119,9 @@ void EventMetricProducer::onMatchedLogEventInternal(
         return;
     }
 
-    long long wrapperToken = mProto->start(TYPE_MESSAGE + FIELD_ID_DATA);
-    mProto->write(TYPE_INT64 + FIELD_ID_TIMESTAMP_NANOS, (long long)event.GetTimestampNs());
-    long long eventToken = mProto->start(TYPE_MESSAGE + FIELD_ID_STATS_EVENTS);
+    long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_TIMESTAMP_NANOS, (long long)event.GetTimestampNs());
+    long long eventToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_STATS_EVENTS);
     event.ToProto(*mProto);
     mProto->end(eventToken);
     mProto->end(wrapperToken);
index a428752..d3d7e37 100644 (file)
@@ -30,14 +30,6 @@ namespace statsd {
 #define MATCHER_NOT_FOUND -2
 #define NANO_SECONDS_IN_A_SECOND (1000 * 1000 * 1000)
 
-// TODO: Remove the following constants once they are exposed in ProtOutputStream.h
-const uint64_t FIELD_TYPE_SHIFT = 32;
-const uint64_t TYPE_MESSAGE = 11ULL << FIELD_TYPE_SHIFT;
-const uint64_t TYPE_INT64 = 3ULL << FIELD_TYPE_SHIFT;
-const uint64_t TYPE_INT32 = 5ULL << FIELD_TYPE_SHIFT;
-const uint64_t TYPE_FLOAT = 2ULL << FIELD_TYPE_SHIFT;
-const uint64_t TYPE_STRING = 9ULL << FIELD_TYPE_SHIFT;
-
 typedef std::string HashableDimensionKey;
 
 typedef std::map<std::string, HashableDimensionKey> ConditionKey;