Skip to content

Commit

Permalink
agents: add OTLP metrics exporter to OTLPAgent
Browse files Browse the repository at this point in the history
Initial implementation covering for the moment just `Sum` and `Gauge`
metric types. This cover most of our metrics. The `Summary` metrics (the
percentiles) will be added in a follow-up PR as they require additions
to the `opentelemetry-cpp` library.
  • Loading branch information
santigimeno committed Apr 4, 2024
1 parent 43af3d1 commit 57239bc
Show file tree
Hide file tree
Showing 21 changed files with 1,148 additions and 179 deletions.
9 changes: 4 additions & 5 deletions agents/otlp/src/datadog_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void DatadogMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
const ProcessMetricsStor& prev_stor) {
json series = json::array();

#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down Expand Up @@ -85,11 +85,10 @@ void DatadogMetrics::got_proc_metrics(const ProcessMetricsStor& stor,

/*virtual*/
void DatadogMetrics::got_thr_metrics(
const std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>>& thr_metrics) {
const std::vector<MetricsExporter::ThrMetricsStor>& thr_metrics) {
json series = json::array();
for (const auto& tm : thr_metrics) {
got_thr_metrics(tm.first, tm.second, series);
got_thr_metrics(tm.stor, tm.prev_stor, series);
}

if (series.size() > 0) {
Expand All @@ -106,7 +105,7 @@ void DatadogMetrics::got_thr_metrics(
void DatadogMetrics::got_thr_metrics(const ThreadMetricsStor& stor,
const ThreadMetricsStor& prev_stor,
nlohmann::json& series) {
#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down
4 changes: 1 addition & 3 deletions agents/otlp/src/datadog_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ class DatadogMetrics final: public MetricsExporter {
virtual void got_proc_metrics(const ProcessMetrics::MetricsStor& stor,
const ProcessMetrics::MetricsStor& prev_stor);

void got_thr_metrics(
const std::vector<std::pair<ThreadMetrics::MetricsStor,
ThreadMetrics::MetricsStor>>&);
void got_thr_metrics(const std::vector<MetricsExporter::ThrMetricsStor>&);

private:
void got_thr_metrics(const ThreadMetrics::MetricsStor& stor,
Expand Down
9 changes: 4 additions & 5 deletions agents/otlp/src/dynatrace_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void DynatraceMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
const ProcessMetricsStor& prev_stor) {
std::string metrics;

#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down Expand Up @@ -71,11 +71,10 @@ void DynatraceMetrics::got_proc_metrics(const ProcessMetricsStor& stor,

/*virtual*/
void DynatraceMetrics::got_thr_metrics(
const std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>>& thr_metrics) {
const std::vector<MetricsExporter::ThrMetricsStor>& thr_metrics) {
std::string metrics;
for (const auto& tm : thr_metrics) {
metrics += got_thr_metrics(tm.first, tm.second);
metrics += got_thr_metrics(tm.stor, tm.prev_stor);
}

if (metrics.size() > 0) {
Expand All @@ -89,7 +88,7 @@ std::string DynatraceMetrics::got_thr_metrics(
const ThreadMetricsStor& stor,
const ThreadMetricsStor& prev_stor) {
std::string metrics;
#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down
5 changes: 2 additions & 3 deletions agents/otlp/src/dynatrace_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ class DynatraceMetrics final: public MetricsExporter {
virtual void got_proc_metrics(const ProcessMetrics::MetricsStor& stor,
const ProcessMetrics::MetricsStor& prev_stor);

void got_thr_metrics(
const std::vector<std::pair<ThreadMetrics::MetricsStor,
ThreadMetrics::MetricsStor>>&);
virtual void got_thr_metrics(
const std::vector<MetricsExporter::ThrMetricsStor>&);


private:
Expand Down
16 changes: 8 additions & 8 deletions agents/otlp/src/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ OTLPHttpClient::OTLPHttpClient(uv_loop_t* loop): HttpClient(loop) {
}

int OTLPHttpClient::post_datadog_metrics(const std::string& url,
const std::string& key,
const std::string& metrics) {
const std::string& key,
const std::string& metrics) {
static std::string auth = "DD-API-KEY: ";
struct curl_slist* header_list = nullptr;
header_list = curl_slist_append(header_list, std::string(auth + key).c_str());
Expand All @@ -26,8 +26,8 @@ int OTLPHttpClient::post_datadog_metrics(const std::string& url,
}

int OTLPHttpClient::post_dynatrace_metrics(const std::string& url,
const std::string& auth_header,
const std::string& metrics) {
const std::string& auth_header,
const std::string& metrics) {
static std::string auth = "Authorization: ";
struct curl_slist* header_list = nullptr;
header_list = curl_slist_append(header_list,
Expand All @@ -37,8 +37,8 @@ int OTLPHttpClient::post_dynatrace_metrics(const std::string& url,
}

int OTLPHttpClient::post_newrelic_metrics(const std::string& url,
const std::string& key,
const std::string& metrics) {
const std::string& key,
const std::string& metrics) {
static std::string auth = "api-key: ";
struct curl_slist* header_list = nullptr;
header_list = curl_slist_append(header_list, std::string(auth + key).c_str());
Expand All @@ -48,8 +48,8 @@ int OTLPHttpClient::post_newrelic_metrics(const std::string& url,
}

int OTLPHttpClient::post_metrics(const std::string& url,
const std::string& metrics,
struct curl_slist* header_list) {
const std::string& metrics,
struct curl_slist* header_list) {
Debug("Posting Metrics to '%s'\n", url.c_str());
CURL* handle = curl_easy_init();
ASSERT_NOT_NULL(handle);
Expand Down
13 changes: 9 additions & 4 deletions agents/otlp/src/metrics_exporter.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
#ifndef AGENTS_OTLP_SRC_METRICS_EXPORTER_H_
#define AGENTS_OTLP_SRC_METRICS_EXPORTER_H_

#include <nsolid/nsolid_api.h>
#include <utility>
#include <vector>

#include "nsolid.h"

namespace node {
namespace nsolid {
namespace otlp {

class MetricsExporter {
public:
struct ThrMetricsStor {
ThreadMetrics::MetricsStor stor;
ThreadMetrics::MetricsStor prev_stor;
uint64_t loop_start;
};

virtual ~MetricsExporter() {}

virtual void got_proc_metrics(
const ProcessMetrics::MetricsStor& stor,
const ProcessMetrics::MetricsStor& prev_stor) = 0;

virtual void got_thr_metrics(
const std::vector<std::pair<ThreadMetrics::MetricsStor,
ThreadMetrics::MetricsStor>>&) = 0;
virtual void got_thr_metrics(const std::vector<ThrMetricsStor>&) = 0;
};

} // namespace otlp
Expand Down
9 changes: 4 additions & 5 deletions agents/otlp/src/newrelic_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void NewRelicMetrics::got_proc_metrics(
const ProcessMetricsStor& prev_stor) {
json metrics = json::array();

#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down Expand Up @@ -88,11 +88,10 @@ void NewRelicMetrics::got_proc_metrics(

/*virtual*/
void NewRelicMetrics::got_thr_metrics(
const std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>>& thr_metrics) {
const std::vector<MetricsExporter::ThrMetricsStor>& thr_metrics) {
json body = json::array();
for (const auto& tm : thr_metrics) {
body.push_back(got_thr_metrics(tm.first, tm.second));
body.push_back(got_thr_metrics(tm.stor, tm.prev_stor));
}

if (body.size() > 0) {
Expand All @@ -105,7 +104,7 @@ void NewRelicMetrics::got_thr_metrics(
json NewRelicMetrics::got_thr_metrics(const ThreadMetricsStor& stor,
const ThreadMetricsStor& prev_stor) {
json metrics = json::array();
#define V(CType, CName, JSName, MType) \
#define V(CType, CName, JSName, MType, Unit) \
{ \
auto it = std::find(discarded_metrics.begin(), \
discarded_metrics.end(), \
Expand Down
3 changes: 1 addition & 2 deletions agents/otlp/src/newrelic_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ class NewRelicMetrics final: public MetricsExporter {
const ProcessMetrics::MetricsStor& prev_stor);

void got_thr_metrics(
const std::vector<std::pair<ThreadMetrics::MetricsStor,
ThreadMetrics::MetricsStor>>&);
const std::vector<MetricsExporter::ThrMetricsStor>&);

private:
nlohmann::json got_thr_metrics(const ThreadMetrics::MetricsStor& stor,
Expand Down
78 changes: 52 additions & 26 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#include "otlp_agent.h"
#include "nsolid/nsolid_api.h"
#include "env-inl.h"
#include "debug_utils-inl.h"
#include "datadog_metrics.h"
#include "dynatrace_metrics.h"
#include "newrelic_metrics.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "nsolid/nsolid_util.h"
#include "otlp_metrics.h"
#include "opentelemetry/sdk/resource/semantic_conventions.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/exporters/otlp/otlp_http_exporter.h"
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
Expand All @@ -18,14 +22,17 @@ namespace exporter = OPENTELEMETRY_NAMESPACE::exporter;
namespace ext = OPENTELEMETRY_NAMESPACE::ext;
namespace trace = OPENTELEMETRY_NAMESPACE::trace;
namespace resource = sdk::resource;
namespace instrumentationscope = sdk::instrumentationscope;
namespace detail = trace::propagation::detail;
using resource::SemanticConventions::kServiceName;
using resource::SemanticConventions::kServiceInstanceId;
using resource::SemanticConventions::kServiceVersion;

static const size_t kTraceIdSize = 32;
static const size_t kSpanIdSize = 16;

static std::atomic<bool> is_running_ = { false };
nsuv::ns_rwlock exit_lock_;
static resource::ResourceAttributes resource_attributes_;

namespace node {
namespace nsolid {
Expand All @@ -44,6 +51,15 @@ inline void DebugJSON(const char* str, const json& msg) {
}
}

JSThreadMetrics::JSThreadMetrics(SharedEnvInst envinst):
metrics_(ThreadMetrics::Create(envinst)),
prev_() {
const AliasedFloat64Array& ps =
envinst->env()->performance_state()->milestones;
loop_start_ = performance::performance_process_start_timestamp +
ps[performance::NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e3;
}


OTLPAgent* OTLPAgent::Inst() {
// Make sure the HttpCurlGlobalInitializer static instance is created before
Expand All @@ -64,6 +80,7 @@ OTLPAgent::OTLPAgent(): ready_(false),
hooks_init_(false),
trace_flags_(0),
otlp_http_exporter_(nullptr),
resource_(create_resource()),
metrics_interval_(0),
proc_metrics_(),
proc_prev_stor_(),
Expand All @@ -73,6 +90,8 @@ OTLPAgent::OTLPAgent(): ready_(false),
ASSERT_EQ(0, uv_mutex_init(&start_lock_));
ASSERT_EQ(0, exit_lock_.init(true));
is_running_ = true;
scope_ = instrumentationscope::InstrumentationScope::Create(
"nsolid", NODE_VERSION "+ns" NSOLID_VERSION);
}


Expand Down Expand Up @@ -249,15 +268,18 @@ int OTLPAgent::config(const nlohmann::json& config) {
std::tuple<SharedEnvInst, bool> tup;
while (agent->env_msg_q_.dequeue(tup)) {
SharedEnvInst envinst = std::get<0>(tup);
bool creation = std::get<1>(tup);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(envinst));
ASSERT(pair.second);
} else {
agent->env_metrics_map_.erase(GetThreadId(envinst));
EnvInst::Scope scp(envinst);
if (scp.Success()) {
bool creation = std::get<1>(tup);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(envinst));
ASSERT(pair.second);
} else {
agent->env_metrics_map_.erase(GetThreadId(envinst));
}
}
}
}
Expand Down Expand Up @@ -296,15 +318,14 @@ void OTLPAgent::do_start() {
ASSERT_EQ(0, metrics_timer_.init(&loop_));
ASSERT_EQ(0, config_msg_.init(&loop_, config_msg_cb_, this));

ready_ = true;
if (!hooks_init_) {
ASSERT_EQ(0, OnConfigurationHook(config_agent_cb_, this));
ASSERT_EQ(0, ThreadAddedHook(on_thread_add_, this));
ASSERT_EQ(0, ThreadRemovedHook(on_thread_remove_, this));
hooks_init_ = true;
}

ready_ = true;

uv_cond_signal(&start_cond_);
uv_mutex_unlock(&start_lock_);
}
Expand Down Expand Up @@ -346,7 +367,6 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
using time_point = std::chrono::system_clock::time_point;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
auto resource = resource::Resource::Create(resource_attributes_);
std::vector<std::unique_ptr<sdk::trace::Recordable>> recordables;
Tracer::SpanStor s;
while (agent->span_msg_q_.dequeue(s)) {
Expand Down Expand Up @@ -397,7 +417,7 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {

recordable->SetAttribute("thread.id", s.thread_id);

recordable->SetResource(resource);
recordable->SetResource(agent->resource_);

recordables.push_back(std::move(recordable));
}
Expand Down Expand Up @@ -432,14 +452,13 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
return;
}

std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>> thr_metrics_vector;
std::vector<MetricsExporter::ThrMetricsStor> thr_metrics_vector;
ThreadMetricsStor stor;
while (agent->thr_metrics_msg_q_.dequeue(stor)) {
auto it = agent->env_metrics_map_.find(stor.thread_id);
if (it != agent->env_metrics_map_.end()) {
auto& metrics = it->second;
thr_metrics_vector.emplace_back(stor, metrics.prev_);
thr_metrics_vector.push_back({stor, metrics.prev_, metrics.loop_start_ });
metrics.prev_ = stor;
}
}
Expand Down Expand Up @@ -564,7 +583,6 @@ void OTLPAgent::config_otlp_agent(const json& config) {
ASSERT(it != config.end());
const nlohmann::json& otlp = it.value();
config_endpoint(type, otlp);
config_service(config);
} else {
Debug("No otlp agent configuration. Stopping the agent\n");
do_stop();
Expand All @@ -576,21 +594,29 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
auto it = config.find("url");
ASSERT(it != config.end());
exporter::otlp::OtlpHttpExporterOptions opts;
opts.url = it->get<std::string>() + "/v1/traces";
const std::string url = it->get<std::string>();
opts.url = url + "/v1/traces";
setup_trace_otlp_exporter(opts);
metrics_exporter_.reset(new OTLPMetrics(&loop_, url, "", *this));
}


void OTLPAgent::config_service(const json& config) {
resource::Resource OTLPAgent::create_resource() const {
json config = json::parse(nsolid::GetConfig(), nullptr, false);
// assert because the runtime should never send me an invalid JSON config
ASSERT(!config.is_discarded());
auto it = config.find("app");
ASSERT(it != config.end());
resource_attributes_.SetAttribute("service.name", it->get<std::string>());
resource_attributes_.SetAttribute("service.version", "1.0.0");
resource::ResourceAttributes attrs({
{kServiceName, it->get<std::string>()},
{kServiceInstanceId, nsolid::GetAgentId()}
});

it = config.find("appVersion");
if (it != config.end()) {
resource_attributes_.SetAttribute("service.version",
it->get<std::string>());
attrs.SetAttribute(kServiceVersion, it->get<std::string>());
}

return resource::Resource::Create(attrs);
}


Expand Down
Loading

0 comments on commit 57239bc

Please sign in to comment.