diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index b5a1283d26..887e1beb92 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -28,6 +28,7 @@ class DefaultAggregation { case InstrumentType::kCounter: case InstrumentType::kUpDownCounter: + case InstrumentType::kObservableCounter: case InstrumentType::kObservableUpDownCounter: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) ? std::move(std::unique_ptr(new LongSumAggregation())) @@ -90,6 +91,53 @@ class DefaultAggregation return DefaultAggregation::CreateAggregation(instrument_descriptor); } } + + static std::unique_ptr CloneAggregation(AggregationType aggregation_type, + InstrumentDescriptor instrument_descriptor, + const Aggregation &to_copy) + { + const PointType point_data = to_copy.ToPoint(); + switch (aggregation_type) + { + case AggregationType::kDrop: + return std::unique_ptr(new DropAggregation()); + case AggregationType::kHistogram: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongHistogramAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleHistogramAggregation(nostd::get(point_data))); + } + case AggregationType::kLastValue: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongLastValueAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleLastValueAggregation(nostd::get(point_data))); + } + case AggregationType::kSum: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongSumAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleSumAggregation(nostd::get(point_data))); + } + default: + return DefaultAggregation::CreateAggregation(instrument_descriptor); + } + } }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h index 4e29fa2e46..6c3d89d247 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h @@ -23,16 +23,18 @@ class DropAggregation : public Aggregation public: DropAggregation() = default; + DropAggregation(const DropPointData &) {} + void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - std::unique_ptr Merge(const Aggregation &delta) const noexcept override + std::unique_ptr Merge(const Aggregation &) const noexcept override { return std::unique_ptr(new DropAggregation()); } - std::unique_ptr Diff(const Aggregation &next) const noexcept override + std::unique_ptr Diff(const Aggregation &) const noexcept override { return std::unique_ptr(new DropAggregation()); } diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h index b5cc2c349e..e2a55fba58 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h @@ -19,6 +19,7 @@ class LongHistogramAggregation : public Aggregation public: LongHistogramAggregation(); LongHistogramAggregation(HistogramPointData &&); + LongHistogramAggregation(const HistogramPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; @@ -26,14 +27,14 @@ class LongHistogramAggregation : public Aggregation /* Returns the result of merge of the existing aggregation with delta aggregation with same * boundaries */ - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; /* Returns the new delta aggregation by comparing existing aggregation with next aggregation with * same boundaries. Data points for `next` aggregation (sum , bucket-counts) should be more than * the current aggregation - which is the normal scenario as measurements values are monotonic * increasing. */ - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -47,6 +48,7 @@ class DoubleHistogramAggregation : public Aggregation public: DoubleHistogramAggregation(); DoubleHistogramAggregation(HistogramPointData &&); + DoubleHistogramAggregation(const HistogramPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} @@ -54,14 +56,14 @@ class DoubleHistogramAggregation : public Aggregation /* Returns the result of merge of the existing aggregation with delta aggregation with same * boundaries */ - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; /* Returns the new delta aggregation by comparing existing aggregation with next aggregation with * same boundaries. Data points for `next` aggregation (sum , bucket-counts) should be more than * the current aggregation - which is the normal scenario as measurements values are monotonic * increasing. */ - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h index 7f185d51a1..3b2c08f8ce 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h @@ -18,14 +18,15 @@ class LongLastValueAggregation : public Aggregation public: LongLastValueAggregation(); LongLastValueAggregation(LastValuePointData &&); + LongLastValueAggregation(const LastValuePointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -39,6 +40,7 @@ class DoubleLastValueAggregation : public Aggregation public: DoubleLastValueAggregation(); DoubleLastValueAggregation(LastValuePointData &&); + DoubleLastValueAggregation(const LastValuePointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h index b0f0169b24..14f13bd727 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h @@ -19,14 +19,15 @@ class LongSumAggregation : public Aggregation public: LongSumAggregation(); LongSumAggregation(SumPointData &&); + LongSumAggregation(const SumPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -40,14 +41,15 @@ class DoubleSumAggregation : public Aggregation public: DoubleSumAggregation(); DoubleSumAggregation(SumPointData &&); + DoubleSumAggregation(const SumPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override; - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index cfbf521538..e4c20e4010 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -10,6 +10,7 @@ # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" # include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h" +# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h" # include @@ -32,7 +33,8 @@ class AsyncMetricStorage : public MetricStorage aggregation_type_{aggregation_type}, measurement_collection_callback_{measurement_callback}, attributes_processor_{attributes_processor}, - active_attributes_hashmap_(new AttributesHashMap()) + cumulative_hash_map_(new AttributesHashMap()), + temporal_metric_storage_(instrument_descriptor) {} bool Collect(CollectorHandle *collector, @@ -45,22 +47,33 @@ class AsyncMetricStorage : public MetricStorage // read the measurement using configured callback measurement_collection_callback_(ob_res); - + std::shared_ptr delta_hash_map(new AttributesHashMap()); // process the read measurements - aggregate and store in hashmap for (auto &measurement : ob_res.GetMeasurements()) { - auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); - agg->Aggregate(measurement.second); - active_attributes_hashmap_->Set(measurement.first, std::move(agg)); + auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); + aggr->Aggregate(measurement.second); + auto prev = cumulative_hash_map_->Get(measurement.first); + if (prev) + { + auto delta = prev->Diff(*aggr); + cumulative_hash_map_->Set(measurement.first, + DefaultAggregation::CloneAggregation( + aggregation_type_, instrument_descriptor_, *delta)); + delta_hash_map->Set(measurement.first, std::move(delta)); + } + else + { + cumulative_hash_map_->Set( + measurement.first, + DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); + delta_hash_map->Set(measurement.first, std::move(aggr)); + } } - // TBD -> read aggregation from hashmap, and perform metric collection - MetricData metric_data; - if (metric_collection_callback(std::move(metric_data))) - { - return true; - } - return false; + return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, + std::move(delta_hash_map), + metric_collection_callback); } private: @@ -68,7 +81,8 @@ class AsyncMetricStorage : public MetricStorage AggregationType aggregation_type_; void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult &); const AttributesProcessor *attributes_processor_; - std::unique_ptr active_attributes_hashmap_; + std::unique_ptr cumulative_hash_map_; + TemporalMetricStorage temporal_metric_storage_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h new file mode 100644 index 0000000000..16659c14f5 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" + +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +struct LastReportedMetrics +{ + std::unique_ptr attributes_map; + opentelemetry::common::SystemTimestamp collection_ts; +}; + +class TemporalMetricStorage +{ +public: + TemporalMetricStorage(InstrumentDescriptor instrument_descriptor); + + bool buildMetrics(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + std::shared_ptr delta_metrics, + nostd::function_ref callback) noexcept; + +private: + InstrumentDescriptor instrument_descriptor_; + + // unreported metrics stash for all the collectors + std::unordered_map>> + unreported_metrics_; + // last reported metrics stash for all the collectors. + std::unordered_map last_reported_metrics_; + + // Lock while building metrics + mutable opentelemetry::common::SpinLockMutex lock_; +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index b6656b5bf8..77a371a80c 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -7,6 +7,7 @@ add_library( export/periodic_exporting_metric_reader.cc state/metric_collector.cc state/sync_metric_storage.cc + state/temporal_metric_storage.cc aggregation/histogram_aggregation.cc aggregation/lastvalue_aggregation.cc aggregation/sum_aggregation.cc diff --git a/sdk/src/metrics/aggregation/histogram_aggregation.cc b/sdk/src/metrics/aggregation/histogram_aggregation.cc index 27405999c9..aa2be74713 100644 --- a/sdk/src/metrics/aggregation/histogram_aggregation.cc +++ b/sdk/src/metrics/aggregation/histogram_aggregation.cc @@ -25,6 +25,10 @@ LongHistogramAggregation::LongHistogramAggregation(HistogramPointData &&data) : point_data_{std::move(data)} {} +LongHistogramAggregation::LongHistogramAggregation(const HistogramPointData &data) + : point_data_{data} +{} + void LongHistogramAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -83,6 +87,10 @@ DoubleHistogramAggregation::DoubleHistogramAggregation(HistogramPointData &&data : point_data_{std::move(data)} {} +DoubleHistogramAggregation::DoubleHistogramAggregation(const HistogramPointData &data) + : point_data_{data} +{} + void DoubleHistogramAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc index 9c0252be31..a125005335 100644 --- a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc +++ b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc @@ -19,10 +19,15 @@ LongLastValueAggregation::LongLastValueAggregation() point_data_.is_lastvalue_valid_ = false; point_data_.value_ = 0l; } + LongLastValueAggregation::LongLastValueAggregation(LastValuePointData &&data) : point_data_{std::move(data)} {} +LongLastValueAggregation::LongLastValueAggregation(const LastValuePointData &data) + : point_data_{data} +{} + void LongLastValueAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -71,10 +76,15 @@ DoubleLastValueAggregation::DoubleLastValueAggregation() point_data_.is_lastvalue_valid_ = false; point_data_.value_ = 0.0; } + DoubleLastValueAggregation::DoubleLastValueAggregation(LastValuePointData &&data) : point_data_{std::move(data)} {} +DoubleLastValueAggregation::DoubleLastValueAggregation(const LastValuePointData &data) + : point_data_{data} +{} + void DoubleLastValueAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/aggregation/sum_aggregation.cc b/sdk/src/metrics/aggregation/sum_aggregation.cc index 94b871cd34..5ca786496e 100644 --- a/sdk/src/metrics/aggregation/sum_aggregation.cc +++ b/sdk/src/metrics/aggregation/sum_aggregation.cc @@ -22,6 +22,8 @@ LongSumAggregation::LongSumAggregation() LongSumAggregation::LongSumAggregation(SumPointData &&data) : point_data_{std::move(data)} {} +LongSumAggregation::LongSumAggregation(const SumPointData &data) : point_data_{data} {} + void LongSumAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -64,6 +66,8 @@ DoubleSumAggregation::DoubleSumAggregation() DoubleSumAggregation::DoubleSumAggregation(SumPointData &&data) : point_data_(std::move(data)) {} +DoubleSumAggregation::DoubleSumAggregation(const SumPointData &data) : point_data_(data) {} + void DoubleSumAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc new file mode 100644 index 0000000000..55e93e3d46 --- /dev/null +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" +# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor) + : instrument_descriptor_(instrument_descriptor) +{} + +bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + std::shared_ptr delta_metrics, + nostd::function_ref callback) noexcept +{ + std::lock_guard guard(lock_); + opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; + auto aggregation_temporarily = collector->GetAggregationTemporality(); + for (auto &col : collectors) + { + unreported_metrics_[col.get()].push_back(delta_metrics); + } + + // Get the unreported metrics for the `collector` from `unreported metrics stash` + // since last collection, this will also cleanup the unreported metrics for `collector` + // from the stash. + auto present = unreported_metrics_.find(collector); + if (present == unreported_metrics_.end()) + { + // no unreported metrics for the collector, return. + return true; + } + auto unreported_list = std::move(present->second); + // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` + std::unique_ptr merged_metrics(new AttributesHashMap); + for (auto &agg_hashmap : unreported_list) + { + agg_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + merged_metrics->Set( + attributes, + DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation)); + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + } + return true; + }); + } + // Get the last reported metrics for the `collector` from `last reported metrics` stash + // - If the aggregation_temporarily for the collector is cumulative + // - Merge the last reported metrics with unreported metrics (which is in merged_metrics), + // Final result of merge would be in merged_metrics. + // - Move the final merge to the `last reported metrics` stash. + // - If the aggregation_temporarily is delta + // - Store the unreported metrics for `collector` (which is in merged_mtrics) to + // `last reported metrics` stash. + + auto reported = last_reported_metrics_.find(collector); + if (reported != last_reported_metrics_.end()) + { + last_collection_ts = last_reported_metrics_[collector].collection_ts; + auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map); + if (aggregation_temporarily == AggregationTemporality::kCumulative) + { + // merge current delta to previous cumulative + last_aggr_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + merged_metrics->Set(attributes, + DefaultAggregation::CreateAggregation(instrument_descriptor_)); + } + return true; + }); + } + last_reported_metrics_[collector] = + LastReportedMetrics{std::move(merged_metrics), collection_ts}; + } + else + { + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + last_reported_metrics_.insert( + std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts})); + } + + // Generate the MetricData from the final merged_metrics, and invoke callback over it. + + AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get(); + MetricData metric_data; + metric_data.instrument_descriptor = instrument_descriptor_; + metric_data.start_ts = last_collection_ts; + metric_data.end_ts = collection_ts; + result_to_export->GetAllEnteries( + [&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) { + PointDataAttributes point_data_attr; + point_data_attr.point_data = aggregation.ToPoint(); + point_data_attr.attributes = attributes; + metric_data.point_data_attr_.push_back(point_data_attr); + return true; + }); + return callback(metric_data); +} + +} // namespace metrics + +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index f939035fb2..a4decaaa79 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -18,45 +18,114 @@ using namespace opentelemetry::sdk::metrics; using namespace opentelemetry::sdk::instrumentationlibrary; using namespace opentelemetry::sdk::resource; -class MockMetricReader : public MetricReader +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; +using M = std::map; + +class MockCollectorHandle : public CollectorHandle { public: - MockMetricReader(AggregationTemporality aggr_temporality) : MetricReader(aggr_temporality) {} + MockCollectorHandle(AggregationTemporality temp) : temporality(temp) {} - virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } + AggregationTemporality GetAggregationTemporality() noexcept override { return temporality; } - virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } - - virtual void OnInitialized() noexcept override {} +private: + AggregationTemporality temporality; }; -void measurement_fetch(opentelemetry::metrics::ObserverResult &observer_result) +class WritableMetricStorageTestFixture : public ::testing::TestWithParam +{}; + +class MeasurementFetcher { - observer_result.Observe(20l); - observer_result.Observe(10l); -} +public: + static void Fetcher(opentelemetry::metrics::ObserverResult &observer_result) + { + fetch_count++; + if (fetch_count == 1) + { + observer_result.Observe(20l, {{"RequestType", "GET"}}); + observer_result.Observe(10l, {{"RequestType", "PUT"}}); + number_of_get += 20l; + number_of_put += 10l; + } + else if (fetch_count == 2) + { + observer_result.Observe(40l, {{"RequestType", "GET"}}); + observer_result.Observe(20l, {{"RequestType", "PUT"}}); + number_of_get += 40l; + number_of_put += 20l; + } + } + + static void init_values() + { + fetch_count = 0; + number_of_get = 0; + number_of_put = 0; + } -TEST(AsyncMetricStorageTest, BasicTests) + static size_t fetch_count; + static long number_of_get; + static long number_of_put; + static const size_t number_of_attributes = 2; // GET , PUT +}; + +size_t MeasurementFetcher::fetch_count; +long MeasurementFetcher::number_of_get; +long MeasurementFetcher::number_of_put; +const size_t MeasurementFetcher::number_of_attributes; + +TEST_P(WritableMetricStorageTestFixture, TestAggregation) { - auto metric_callback = [](MetricData &&metric_data) { return true; }; - InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + MeasurementFetcher::init_values(); + AggregationTemporality temporality = GetParam(); + + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter, InstrumentValueType::kLong}; auto sdk_start_ts = std::chrono::system_clock::now(); // Some computation here auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5); - std::shared_ptr meter_context(new MeterContext()); - std::unique_ptr metric_reader(new MockMetricReader(AggregationTemporality::kDelta)); + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + size_t count_attributes = 0; + long value = 0; - std::shared_ptr collector = std::shared_ptr( - new MetricCollector(std::move(meter_context), std::move(metric_reader))); + MeasurementFetcher measurement_fetcher; + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum, + MeasurementFetcher::Fetcher, + new DefaultAttributesProcessor()); - std::vector> collectors{collector}; - - opentelemetry::sdk::metrics::AsyncMetricStorage storage( - instr_desc, AggregationType::kSum, &measurement_fetch, new DefaultAttributesProcessor()); - EXPECT_NO_THROW( - storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, metric_callback)); + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_get); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_put); + } + count_attributes++; + } + return true; + }); + EXPECT_EQ(MeasurementFetcher::number_of_attributes, count_attributes); } + +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + #endif \ No newline at end of file