Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: make nsolid::ThreadMetrics safer #37

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
for (auto& item : agent->env_metrics_map_) {
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
item.second.metrics_.Update(thr_metrics_cb_, agent);
item.second.metrics_->Update(thr_metrics_cb_, agent);
}
}

Expand All @@ -435,12 +435,11 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {

std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>> thr_metrics_vector;
ThreadMetrics* m;
while (agent->thr_metrics_msg_q_.dequeue(&m)) {
auto it = agent->env_metrics_map_.find(m->thread_id());
ThreadMetricsStor stor;
while (agent->thr_metrics_msg_q_.dequeue(stor)) {
auto it = agent->env_metrics_map_.find(stor.thread_id);
if (it != agent->env_metrics_map_.end()) {
auto& metrics = it->second;
ThreadMetricsStor stor = m->Get();
thr_metrics_vector.emplace_back(stor, metrics.prev_);
metrics.prev_ = stor;
}
Expand All @@ -452,14 +451,14 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
}


/*static*/void OTLPAgent::thr_metrics_cb_(ThreadMetrics* metrics,
/*static*/void OTLPAgent::thr_metrics_cb_(SharedThreadMetrics metrics,
OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

agent->thr_metrics_msg_q_.enqueue(metrics);
agent->thr_metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_));
}

Expand Down
11 changes: 6 additions & 5 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ namespace nsolid {
namespace otlp {

struct JSThreadMetrics {
ThreadMetrics metrics_;
SharedThreadMetrics metrics_;
ThreadMetrics::MetricsStor prev_;
explicit JSThreadMetrics(SharedEnvInst envinst): metrics_(envinst),
prev_() {}
explicit JSThreadMetrics(SharedEnvInst envinst)
: metrics_(ThreadMetrics::Create(envinst)),
prev_() {}
};

class OTLPAgent {
Expand Down Expand Up @@ -73,7 +74,7 @@ class OTLPAgent {

static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void thr_metrics_cb_(ThreadMetrics*, OTLPAgent*);
static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*);

void do_start();

Expand Down Expand Up @@ -132,7 +133,7 @@ class OTLPAgent {
ProcessMetrics::MetricsStor proc_prev_stor_;
std::map<uint64_t, JSThreadMetrics> env_metrics_map_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> thr_metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> thr_metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::unique_ptr<MetricsExporter> metrics_exporter_;

Expand Down
21 changes: 9 additions & 12 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,7 @@ void StatsDAgent::env_msg_cb(nsuv::ns_async*, StatsDAgent* agent) {
bool creation = std::get<1>(tup);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(envinst));
GetThreadId(envinst), ThreadMetrics::Create(envinst));
ASSERT(pair.second);
} else {
ASSERT_EQ(1, agent->env_metrics_map_.erase(GetThreadId(envinst)));
Expand All @@ -480,18 +478,16 @@ void StatsDAgent::shutdown_cb_(nsuv::ns_async*, StatsDAgent* agent) {
}

void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, StatsDAgent* agent) {
ThreadMetrics* m;
while (agent->metrics_msg_q_.dequeue(&m)) {
uint64_t thread_id = m->thread_id();
ThreadMetrics::MetricsStor stor = m->Get();
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
json body = {
#define V(Type, CName, JSName, MType) \
{ #JSName, stor.CName },
NSOLID_ENV_METRICS_NUMBERS(V)
#undef V
};

agent->send_metrics(body, thread_id, stor.thread_name.c_str());
agent->send_metrics(body, stor.thread_id, stor.thread_name.c_str());
}
}

Expand Down Expand Up @@ -734,10 +730,10 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
for (auto it = agent->env_metrics_map_.begin();
it != agent->env_metrics_map_.end();
++it) {
ThreadMetrics& e_metrics = std::get<1>(*it);
SharedThreadMetrics& e_metrics = std::get<1>(*it);
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
e_metrics.Update(env_metrics_cb_, agent);
e_metrics->Update(env_metrics_cb_, agent);
}

// Get and send proc metrics
Expand All @@ -753,13 +749,14 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
agent->send_metrics(body);
}

void StatsDAgent::env_metrics_cb_(ThreadMetrics* metrics, StatsDAgent* agent) {
void StatsDAgent::env_metrics_cb_(SharedThreadMetrics metrics,
StatsDAgent* agent) {
// Check if the agent is already closing
if (agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
6 changes: 3 additions & 3 deletions agents/statsd/src/statsd_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class StatsDAgent {

static void send_stats_msg_cb_(nsuv::ns_async*, StatsDAgent*);

static void env_metrics_cb_(ThreadMetrics*, StatsDAgent*);
static void env_metrics_cb_(SharedThreadMetrics, StatsDAgent*);

static void status_command_cb_(SharedEnvInst, StatsDAgent*);

Expand Down Expand Up @@ -258,11 +258,11 @@ class StatsDAgent {
nsuv::ns_timer retry_timer_;

// For the Metrics API
std::map<uint64_t, ThreadMetrics> env_metrics_map_;
std::map<uint64_t, SharedThreadMetrics> env_metrics_map_;
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;

// For the Configuration API
Expand Down
19 changes: 9 additions & 10 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,18 +881,17 @@ void ZmqAgent::got_trace(Tracer* tracer,
}


void ZmqAgent::got_env_metrics(ThreadMetrics* t_metrics) {
void ZmqAgent::got_env_metrics(const ThreadMetrics::MetricsStor& stor) {
ProcessMetrics::MetricsStor proc_stor;

auto iter = env_metrics_map_.find(t_metrics->thread_id());
auto iter = env_metrics_map_.find(stor.thread_id);
if (iter != env_metrics_map_.end()) {
ASSERT_PTR_EQ(t_metrics, &iter->second.t_metrics);
iter->second.fetching = false;
// Store into the completed_env_metrics_ vector so we have easy access once
// the metrics from all the threads are retrieved. Make a copy of
// ThreadMetrics to make sure the metrics are still valid even if the
// thread is gone.
completed_env_metrics_.push_back(t_metrics->Get());
completed_env_metrics_.push_back(stor);
}

bool done = true;
Expand Down Expand Up @@ -951,9 +950,9 @@ NSOLID_ENV_METRICS_STRINGS(V)


void ZmqAgent::metrics_msg_cb(nsuv::ns_async*, ZmqAgent* agent) {
ThreadMetrics* metrics;
while (agent->metrics_msg_q_.dequeue(&metrics)) {
agent->got_env_metrics(metrics);
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
agent->got_env_metrics(stor);
}
}

Expand Down Expand Up @@ -1641,19 +1640,19 @@ void ZmqAgent::metrics_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) {
stor.fetching = false;
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
int r = stor.t_metrics.Update(env_metrics_cb, agent);
int r = stor.t_metrics->Update(env_metrics_cb, agent);
if (r == 0)
stor.fetching = true;
}
}

void ZmqAgent::env_metrics_cb(ThreadMetrics* metrics, ZmqAgent* agent) {
void ZmqAgent::env_metrics_cb(SharedThreadMetrics metrics, ZmqAgent* agent) {
// Check if the agent is already delete or it's closing
if (!is_running || agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
14 changes: 8 additions & 6 deletions agents/zmq/src/zmq_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class ZmqAgent {
};

struct EnvMetricsStor {
ThreadMetrics t_metrics;
SharedThreadMetrics t_metrics;
bool fetching;
explicit EnvMetricsStor(SharedEnvInst envinst, bool f): t_metrics(envinst),
fetching(f) {}
NSOLID_DELETE_DEFAULT_CONSTRUCTORS(EnvMetricsStor)
explicit EnvMetricsStor(SharedEnvInst envinst, bool f)
: t_metrics(ThreadMetrics::Create(envinst)),
fetching(f) {}
};

struct ZmqCommandError {
Expand Down Expand Up @@ -478,7 +480,7 @@ class ZmqAgent {

static void update_state_msg_cb(nsuv::ns_async*, ZmqAgent*);

static void env_metrics_cb(ThreadMetrics*, ZmqAgent*);
static void env_metrics_cb(SharedThreadMetrics, ZmqAgent*);

static void status_command_cb(SharedEnvInst, ZmqAgent*);

Expand Down Expand Up @@ -555,7 +557,7 @@ class ZmqAgent {
const std::pair<bool, std::string>&,
const std::pair<bool, std::string>&);

void got_env_metrics(ThreadMetrics* t_metrics);
void got_env_metrics(const ThreadMetrics::MetricsStor& stor);

void got_heap_snapshot(int status,
const std::string& snaphost,
Expand Down Expand Up @@ -657,7 +659,7 @@ class ZmqAgent {
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::string cached_metrics_;
std::set<std::string> pending_metrics_reqs_;
Expand Down
55 changes: 40 additions & 15 deletions src/nsolid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ int ProcessMetrics::Update() {


ThreadMetrics::ThreadMetrics(SharedEnvInst envinst)
: thread_id_(envinst->thread_id()) {
: thread_id_(envinst->thread_id()),
user_data_(nullptr, nullptr) {
CHECK_NOT_NULL(envinst.get());
CHECK_EQ(uv_mutex_init(&stor_lock_), 0);
stor_.thread_id = thread_id_;
Expand All @@ -275,14 +276,22 @@ ThreadMetrics::~ThreadMetrics() {


ThreadMetrics::ThreadMetrics(uint64_t thread_id)
: thread_id_(thread_id) {
: thread_id_(thread_id),
user_data_(nullptr, nullptr) {
CHECK_EQ(uv_mutex_init(&stor_lock_), 0);
stor_.thread_id = thread_id_;
stor_.prev_call_time_ = uv_hrtime();
stor_.current_hrtime_ = stor_.prev_call_time_;
}


SharedThreadMetrics ThreadMetrics::Create(SharedEnvInst envinst) {
return SharedThreadMetrics(new ThreadMetrics(envinst), [](ThreadMetrics* tm) {
delete tm;
});
}


std::string ThreadMetrics::toJSON() {
MetricsStor dup;
std::string metrics_string;
Expand Down Expand Up @@ -342,23 +351,39 @@ int ThreadMetrics::Update(v8::Isolate* isolate) {

int ThreadMetrics::get_thread_metrics_() {
// Might need to fire myself for using nested lambdas.
void (*cb)(SharedEnvInst, ThreadMetrics*) =
auto cb = [](SharedEnvInst ei, std::weak_ptr<ThreadMetrics> wp) {
// This runs from the worker thread.
[](SharedEnvInst ei, ThreadMetrics* tm) {
void (*ret_proxy)(ThreadMetrics*) =
[](ThreadMetrics* tm) {
tm->proxy_(tm);
};

uv_mutex_lock(&tm->stor_lock_);
ei->GetThreadMetrics(&tm->stor_);
uv_mutex_unlock(&tm->stor_lock_);

auto ret_proxy = [](std::weak_ptr<ThreadMetrics> wp) {
// This runs from the NSolid thread.
QueueCallback(ret_proxy, tm);
SharedThreadMetrics tm_sp = wp.lock();
if (tm_sp == nullptr) {
return;
}

tm_sp->proxy_(tm_sp);
};

return RunCommand(GetEnvInst(thread_id_), CommandType::Interrupt, cb, this);
SharedThreadMetrics tm_sp = wp.lock();
if (tm_sp == nullptr) {
return;
}

uv_mutex_lock(&tm_sp->stor_lock_);
ei->GetThreadMetrics(&tm_sp->stor_);
uv_mutex_unlock(&tm_sp->stor_lock_);

QueueCallback(ret_proxy, tm_sp);
};

std::weak_ptr<ThreadMetrics> wp = weak_from_this();
DCHECK(!wp.expired());
return RunCommand(GetEnvInst(thread_id_), CommandType::Interrupt, cb, wp);
}


void ThreadMetrics::reset() {
proxy_ = nullptr;
update_running_ = false;
}


Expand Down
Loading