Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement periodic exporting metric reader #1286

Merged
merged 21 commits into from
Apr 1, 2022
Merged
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions exporters/ostream/BUILD
Original file line number Diff line number Diff line change
@@ -43,36 +43,37 @@ cc_library(
],
)

cc_library(
name = "ostream_metric_exporter",
srcs = [
"src/metric_exporter.cc",
],
hdrs = [
"include/opentelemetry/exporters/ostream/metric_exporter.h",
],
strip_include_prefix = "include",
tags = [
"metrics",
"ostream",
],
deps = [
"//sdk/src/metrics",
],
)
# TODO - Uncomment once MetricData interface is finalised
#cc_library(
# name = "ostream_metric_exporter",
# srcs = [
# "src/metric_exporter.cc",
# ],
# hdrs = [
# "include/opentelemetry/exporters/ostream/metric_exporter.h",
# ],
# strip_include_prefix = "include",
# tags = [
# "metrics",
# "ostream",
# ],
# deps = [
# "//sdk/src/metrics",
# ],
#)

cc_test(
name = "ostream_metric_test",
srcs = ["test/ostream_metric_test.cc"],
tags = [
"ostream",
"test",
],
deps = [
":ostream_metric_exporter",
"@com_google_googletest//:gtest_main",
],
)
#cc_test(
# name = "ostream_metric_test",
# srcs = ["test/ostream_metric_test.cc"],
# tags = [
# "ostream",
# "test",
# ],
# deps = [
# ":ostream_metric_exporter",
# "@com_google_googletest//:gtest_main",
# ],
#)

cc_test(
name = "ostream_metrics_test_deprecated",
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/version.h"

# include <atomic>
# include <chrono>
# include <condition_variable>
# include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class MetricExporter;
/**
* Struct to hold PeriodicExortingMetricReader options.
*/

constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000);
;
constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000);
struct PeriodicExportingMetricReaderOptions
{

/* The time interval between two consecutive exports. */
std::chrono::milliseconds export_interval_millis =
std::chrono::milliseconds(kExportIntervalMillis);

/* how long the export can run before it is cancelled. */
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis);
};

class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative);

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;

bool OnShutDown(std::chrono::microseconds timeout) noexcept override;

void OnInitialized() noexcept override;

std::unique_ptr<MetricExporter> exporter_;
std::chrono::milliseconds export_interval_millis_;
std::chrono::milliseconds export_timeout_millis_;

void DoBackgroundWork();

/* The background worker thread */
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
7 changes: 1 addition & 6 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
@@ -32,9 +32,7 @@ class MetricExporter
* concurrently for the same exporter instance.
* @param spans a span of unique pointers to metrics data
*/
virtual opentelemetry::sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::metrics::MetricData>>
&records) noexcept = 0;
virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept = 0;

/**
* Force flush the exporter.
@@ -49,9 +47,6 @@ class MetricExporter
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;

private:
AggregationTemporality aggregation_temporality_;
};
} // namespace metrics
} // namespace sdk
4 changes: 2 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
@@ -26,8 +26,7 @@ class MetricProducer;
class MetricReader
{
public:
MetricReader(
AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative);
MetricReader(AggregationTemporality aggr_temp = AggregationTemporality::kCummulative);

void SetMetricProducer(MetricProducer *metric_producer);

@@ -58,6 +57,7 @@ class MetricReader

virtual void OnInitialized() noexcept {}

protected:
bool IsShutdown() const noexcept;

private:
1 change: 1 addition & 0 deletions sdk/src/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ add_library(
meter.cc
meter_context.cc
metric_reader.cc
export/periodic_exporting_metric_reader.cc
state/metric_collector.cc
aggregation/histogram_aggregation.cc
aggregation/lastvalue_aggregation.cc
101 changes: 101 additions & 0 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/common/global_log_handler.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <chrono>
# include <future>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

PeriodicExportingMetricReader::PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality)
: MetricReader(aggregation_temporality),
exporter_{std::move(exporter)},
export_interval_millis_{option.export_interval_millis},
export_timeout_millis_{option.export_timeout_millis}
{
if (export_interval_millis_ <= export_timeout_millis_)
{
OTEL_INTERNAL_LOG_WARN(
"[Periodic Exporting Metric Reader] Invalid configuration: "
"export_interval_millis_ should be less than export_timeout_millis_, using default values");
export_interval_millis_ = kExportIntervalMillis;
export_timeout_millis_ = kExportTimeOutMillis;
}
}

void PeriodicExportingMetricReader::OnInitialized() noexcept
{
worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this);
}

void PeriodicExportingMetricReader::DoBackgroundWork()
{
std::unique_lock<std::mutex> lk(cv_m_);
do
{
if (IsShutdown())
{
break;
}
std::atomic<bool> cancel_export_for_timeout{false};
auto start = std::chrono::steady_clock::now();
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](MetricData data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
} while (true);
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
9 changes: 7 additions & 2 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality)
void MetricReader::SetMetricProducer(MetricProducer *metric_producer)
{
metric_producer_ = metric_producer;
OnInitialized();
}

AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept
@@ -46,18 +47,22 @@ bool MetricReader::Collect(nostd::function_ref<bool(MetricData)> callback) noexc
bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
{
bool status = true;

if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
}

{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
}

if (!OnShutDown(timeout))
{
status = false;
OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!");
}
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
return status;
}

3 changes: 2 additions & 1 deletion sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -11,7 +11,8 @@ foreach(
observer_result_test
sync_instruments_test
async_instruments_test
metric_reader_test)
metric_reader_test
periodic_exporting_metric_reader_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
3 changes: 1 addition & 2 deletions sdk/test/metrics/meter_provider_sdk_test.cc
Original file line number Diff line number Diff line change
@@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter

public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<MetricData>> &records) noexcept override
opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Loading