Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agents: add metrics transform API to agent #123

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions agents/statsd/src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,6 @@ static void Send(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(StatsDAgent::Inst()->send(sv, full_size));
}

static void Start(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(StatsDAgent::Inst()->start());
}

static void Stop(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(StatsDAgent::Inst()->stop());
}

static void Tags(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
std::string tags = StatsDAgent::Inst()->tags();
Expand Down Expand Up @@ -156,9 +148,7 @@ static void RegisterStatusCb(const FunctionCallbackInfo<Value>& args) {
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Bucket);
registry->Register(Send);
registry->Register(Start);
registry->Register(Status);
registry->Register(Stop);
registry->Register(Tags);
registry->Register(TcpIp);
registry->Register(UdpIp);
Expand All @@ -178,8 +168,6 @@ void InitStatsDAgent(Local<Object> exports,

NODE_SET_METHOD(exports, "bucket", Bucket);
NODE_SET_METHOD(exports, "send", Send);
NODE_SET_METHOD(exports, "start", Start);
NODE_SET_METHOD(exports, "stop", Stop);
NODE_SET_METHOD(exports, "tags", Tags);
NODE_SET_METHOD(exports, "tcpIp", TcpIp);
NODE_SET_METHOD(exports, "udpIp", UdpIp);
Expand Down
72 changes: 52 additions & 20 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ using nlohmann::json;
using string_vector = std::vector<std::string>;
using udp_req_data_tup = std::tuple<string_vector*, bool>;


template <typename... Args>
inline void Debug(Args&&... args) {
per_process::Debug(DebugCategory::NSOLID_STATSD_AGENT,
Expand Down Expand Up @@ -455,6 +456,17 @@ SharedStatsDAgent StatsDAgent::Inst() {
return agent;
}

SharedStatsDAgent StatsDAgent::Create() {
SharedStatsDAgent agent(new StatsDAgent(), [](StatsDAgent* agent) {
delete agent;
});
// TODO(trevnorris): For now we'll assume that a StatsDAgent instance will
// be alive for the entire life of the process. So not going to worry about
// cleaning up the hooks that are added for this instance.
agent->start();
return agent;
}

StatsDAgent::StatsDAgent(): hooks_init_(false),
proc_metrics_(),
config_(default_agent_config),
Expand All @@ -472,7 +484,16 @@ StatsDAgent::StatsDAgent(): hooks_init_(false),

StatsDAgent::~StatsDAgent() {
int r;
ASSERT_EQ(0, stop());

if (status_ != Unconfigured) {
if (utils::are_threads_equal(thread_.base(), uv_thread_self())) {
do_stop();
} else {
ASSERT_EQ(0, shutdown_.send());
ASSERT_EQ(0, thread_.join());
}
}

uv_mutex_destroy(&start_lock_);
uv_cond_destroy(&start_cond_);
// The destructor will be called from the main thread, being StatsDAgent a
Expand Down Expand Up @@ -527,7 +548,10 @@ void StatsDAgent::do_start() {
status(Initializing);

if (hooks_init_ == false) {
ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this()));
// Only add the OnConfigurationHook if this is the global instance.
if (Inst().get() == this) {
ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this()));
}
Comment on lines +552 to +554
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the test right? Why aren't you adding the ThreadAdded/RemovedHook methods inside that conditional as well?

Copy link
Contributor Author

@trevnorris trevnorris May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because I am being lazy. There are two main assumptions:

  1. A StatsDAgent instance will be used for the entire process lifetime
  2. At most 2 StatsDAgent instances will ever be created.

Using these assumptions, I decided it would be easier for each StatsDAgent instance to set up its own hooks to check when a new thread is created/destroyed rather than have a single global hook that would iterate through a list of StatsDAgent instances when a thread is created/destroyed. This simplified the logic a lot and will be easy to fix if we ever add the ability to remove a hook.

Though, now that I type this up I realize that all the stuff I added for the agent_list_ can be removed. Since it was never used. Will make that change.

ASSERT_EQ(0, ThreadAddedHook(env_creation_cb, weak_from_this()));
ASSERT_EQ(0, ThreadRemovedHook(env_deletion_cb, weak_from_this()));
hooks_init_ = true;
Expand All @@ -537,23 +561,6 @@ void StatsDAgent::do_start() {
uv_mutex_unlock(&start_lock_);
}

// This method can only be called:
// 1) From the StatsDAgent thread when disabling the agent via configuration.
// 2) From the main JS thread on exit as this is a static Singleton.
int StatsDAgent::stop() {
int r = 0;
if (status_ != Unconfigured) {
if (utils::are_threads_equal(thread_.base(), uv_thread_self())) {
do_stop();
} else {
ASSERT_EQ(0, shutdown_.send());
ASSERT_EQ(0, thread_.join());
}
}

return r;
}

void StatsDAgent::do_stop() {
{
nsuv::ns_rwlock::scoped_wrlock lock(stop_lock_);
Expand Down Expand Up @@ -677,6 +684,11 @@ void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) {

ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
if (agent->t_transform_cb_ != nullptr) {
agent->connection_->write(agent->t_transform_cb_(stor));
continue;
}

json body = {
#define V(Type, CName, JSName, MType, Unit) \
{ #JSName, stor.CName },
Expand Down Expand Up @@ -719,7 +731,7 @@ void StatsDAgent::config_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) {
json config_msg;

while (agent->config_msg_q_.dequeue(config_msg)) {
r = agent->config(config_msg);
r = agent->config_cb_(config_msg);
if (agent->status_ != Unconfigured) {
ASSERT_EQ(0, agent->update_state_msg_.send());
}
Expand Down Expand Up @@ -765,6 +777,14 @@ int StatsDAgent::send(const std::vector<std::string>& sv, size_t len) {
return send_stats_msg_.send();
}

void StatsDAgent::set_pmetrics_transform_cb(pmetrics_transform_cb cb) {
p_transform_cb_ = cb;
}

void StatsDAgent::set_tmetrics_transform_cb(tmetrics_transform_cb cb) {
t_transform_cb_ = cb;
}

std::string StatsDAgent::status() const {
switch (status_) {
#define X(type, str) \
Expand Down Expand Up @@ -926,6 +946,12 @@ void StatsDAgent::config_tags() {
}

int StatsDAgent::config(const json& config) {
config_msg_q_.enqueue(config);
ASSERT_EQ(0, config_msg_.send());
return 0;
}

int StatsDAgent::config_cb_(const json& config) {
int ret;

json old_config = config_;
Expand Down Expand Up @@ -1001,6 +1027,12 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, WeakStatsDAgent agent_wp) {
// Get and send proc metrics
ASSERT_EQ(0, agent->proc_metrics_.Update());
proc_stor = agent->proc_metrics_.Get();

if (agent->p_transform_cb_ != nullptr) {
agent->connection_->write(agent->p_transform_cb_(proc_stor));
return;
}

json body = {
#define V(Type, CName, JSName, MType, Unit) \
{ #JSName, proc_stor.CName },
Expand Down
24 changes: 22 additions & 2 deletions agents/statsd/src/statsd_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ using WeakStatsDAgent = std::weak_ptr<StatsDAgent>;

class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {
public:
using pmetrics_transform_cb = std::vector<std::string>(*)(
ProcessMetrics::MetricsStor);
using tmetrics_transform_cb = std::vector<std::string>(*)(
ThreadMetrics::MetricsStor);

enum Status {
#define X(type, str) \
type,
Expand All @@ -204,12 +209,17 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

static SharedStatsDAgent Inst();

// Need to run config() to start the metrics timer and receive runtime
// metrics. Problem is do_start() needs to run first...
static SharedStatsDAgent Create();

int setup_metrics_timer(uint64_t period);

// TODO(trevnorris): This is only meant to be used by the global instance,
// not by the public. Change the way the global instance works so that this
// isn't necessary.
int start();

int stop();

// Dynamically set the current configuration of the agent.
// The JSON schema of the currently supported configuration with its
// default values:
Expand All @@ -233,6 +243,11 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

int send(const std::vector<std::string>& sv, size_t len);

// Whatever string is returned by metrics_transform_cb will override the
// statsdBucket and statsdTags config.
void set_pmetrics_transform_cb(pmetrics_transform_cb cb);
void set_tmetrics_transform_cb(tmetrics_transform_cb cb);

std::string status() const;

std::string tags() {
Expand Down Expand Up @@ -283,6 +298,8 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

std::string calculate_tags(const std::string& tpl) const;

int config_cb_(const nlohmann::json& message);

void config_bucket();

void config_tags();
Expand Down Expand Up @@ -343,6 +360,9 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {
std::string tags_;
nsuv::ns_mutex tags_lock_;

pmetrics_transform_cb p_transform_cb_ = nullptr;
tmetrics_transform_cb t_transform_cb_ = nullptr;

// For status testing
status_cb status_cb_;
};
Expand Down
5 changes: 5 additions & 0 deletions src/nsolid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ SharedEnvInst GetEnvInst(uint64_t thread_id) {
}


void GetAllEnvInst(std::function<void(SharedEnvInst)> cb) {
EnvList::Inst()->getAllEnvInst(cb);
}


SharedEnvInst GetLocalEnvInst(v8::Local<v8::Context> context) {
return EnvInst::GetCurrent(context);
}
Expand Down
6 changes: 6 additions & 0 deletions src/nsolid.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ NODE_EXTERN void thread_removed_hook_(void*,
*/
NODE_EXTERN SharedEnvInst GetEnvInst(uint64_t thread_id);

/**
* @brief Call cb synchronously with each existing SharedEnvInst instance
* currently alive. This call is thread-safe.
*/
NODE_EXTERN void GetAllEnvInst(std::function<void(SharedEnvInst)>);

/**
* @brief Retrieve the SharedEnvInst for the current v8::Context. Must be
* run from a valid Isolate. This call is not thread-safe.
Expand Down
8 changes: 8 additions & 0 deletions src/nsolid/nsolid_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,14 @@ void EnvList::UpdateTracingFlags(uint32_t flags) {
}


void EnvList::getAllEnvInst(std::function<void(SharedEnvInst)> cb) {
ns_mutex::scoped_lock lock(map_lock_);
for (const auto& pair : env_map_) {
cb(pair.second);
}
}


void EnvList::datapoint_cb_(std::queue<MetricsStream::Datapoint>&& q) {
// It runs in the nsolid thread
if (q.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/nsolid/nsolid_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ class EnvList {

void UpdateTracingFlags(uint32_t flags);

void getAllEnvInst(std::function<void(SharedEnvInst)>);

inline uv_thread_t thread();
inline size_t env_map_size();
inline nsuv::ns_mutex* command_lock();
Expand Down
23 changes: 23 additions & 0 deletions test/addons/nsolid-get-all-envinst/binding.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <node.h>
#include <v8.h>
#include <uv.h>
#include <nsolid.h>

#include <assert.h>
#include <atomic>

using v8::FunctionCallbackInfo;
using v8::Value;

void CheckEnvCount(const FunctionCallbackInfo<Value>& args) {
int len = 0;
node::nsolid::GetAllEnvInst([&len](node::nsolid::SharedEnvInst) {
len++;
});
args.GetReturnValue().Set(len);
}


NODE_MODULE_INIT(/* exports, module, context */) {
NODE_SET_METHOD(exports, "checkEnvCount", CheckEnvCount);
}
16 changes: 16 additions & 0 deletions test/addons/nsolid-get-all-envinst/binding.gyp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
'targets': [{
'target_name': 'binding',
'sources': [ 'binding.cc' ],
'includes': ['../common.gypi'],
'target_defaults': {
'default_configuration': 'Release',
'configurations': {
'Debug': {
'defines': [ 'DEBUG', '_DEBUG' ],
'cflags': [ '-g', '-O0', '-fstandalone-debug' ],
}
},
},
}],
}
35 changes: 35 additions & 0 deletions test/addons/nsolid-get-all-envinst/nsolid-get-all-envinst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

const { buildType, mustCall, skip } = require('../../common');

Check failure on line 3 in test/addons/nsolid-get-all-envinst/nsolid-get-all-envinst.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

'mustCall' is assigned a value but never used
const assert = require('assert');
const bindingPath = require.resolve(`./build/${buildType}/binding`);
const binding = require(bindingPath);
const { Worker, isMainThread, parentPort } = require('worker_threads');
const workerList = [];

if (process.env.NSOLID_COMMAND)
skip('required to run without the Console');

if (!isMainThread && +process.argv[2] !== process.pid)
skip('Test must first run as the main thread');

if (isMainThread) {
spawnWorker();
} else {
parentPort.on('message', () => process.exit());
}

function spawnWorker() {
if (workerList.length > 4) {
for (let w of workerList)

Check failure on line 24 in test/addons/nsolid-get-all-envinst/nsolid-get-all-envinst.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

'w' is never reassigned. Use 'const' instead
w.postMessage('bye');
return;
}
const w = new Worker(__filename, { argv: [process.pid] });
w.on('online', () => {
// Add 1 for the main thread.
assert.strictEqual(binding.checkEnvCount(), workerList.length + 1);
spawnWorker();
});
workerList.push(w);
}
Loading
Loading