Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve InfluxdbWriter performance #5764

Merged
merged 1 commit into from
Dec 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix performance issues for InfluxdbWriter
  • Loading branch information
gunnarbeutner committed Dec 12, 2017
commit 40f9431413ef3165f1eccd52c1fdace78ddf8899
184 changes: 87 additions & 97 deletions lib/perfdata/influxdbwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@

using namespace icinga;

class InfluxdbInteger : public Object
{
public:
DECLARE_PTR_TYPEDEFS(InfluxdbInteger);

InfluxdbInteger(int value)
: m_Value(value)
{ }

int GetValue(void) const
{
return m_Value;
}

private:
int m_Value;
};

REGISTER_TYPE(InfluxdbWriter);

REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
Expand Down Expand Up @@ -106,7 +124,7 @@ void InfluxdbWriter::Start(bool runtimeCreated)
m_FlushTimer->Reschedule(0);

/* Register for new metrics. */
Service::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}

void InfluxdbWriter::Stop(bool runtimeRemoved)
Expand Down Expand Up @@ -134,7 +152,7 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
//TODO: Close the connection, if we keep it open.
}

Stream::Ptr InfluxdbWriter::Connect()
Stream::Ptr InfluxdbWriter::Connect(void)
{
TcpSocket::Ptr socket = new TcpSocket();

Expand Down Expand Up @@ -176,10 +194,10 @@ Stream::Ptr InfluxdbWriter::Connect()

void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr));
m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow);
}

void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();

Expand Down Expand Up @@ -211,28 +229,16 @@ void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable,
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
// Prevent missing macros from warning; will return an empty value
// which will be filtered out in SendMetric()
String missing_macro;
tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro));
}
}
Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);

SendPerfdata(tmpl, checkable, cr, ts);
}

String InfluxdbWriter::FormatInteger(int val)
{
return Convert::ToString(val) + "i";
}
if (!missing_macro.IsEmpty())
continue;

String InfluxdbWriter::FormatBoolean(bool val)
{
return val ? "true" : "false";
}
tags->Set(pair.first, value);
}
}

void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
Expand Down Expand Up @@ -280,97 +286,68 @@ void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::
Dictionary::Ptr fields = new Dictionary();

if (service)
fields->Set("state", FormatInteger(service->GetState()));
fields->Set("state", new InfluxdbInteger(service->GetState()));
else
fields->Set("state", FormatInteger(host->GetState()));

fields->Set("current_attempt", FormatInteger(checkable->GetCheckAttempt()));
fields->Set("max_check_attempts", FormatInteger(checkable->GetMaxCheckAttempts()));
fields->Set("state_type", FormatInteger(checkable->GetStateType()));
fields->Set("reachable", FormatBoolean(checkable->IsReachable()));
fields->Set("downtime_depth", FormatInteger(checkable->GetDowntimeDepth()));
fields->Set("acknowledgement", FormatInteger(checkable->GetAcknowledgement()));
fields->Set("state", new InfluxdbInteger(host->GetState()));

fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
fields->Set("reachable", checkable->IsReachable());
fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
fields->Set("latency", cr->CalculateLatency());
fields->Set("execution_time", cr->CalculateExecutionTime());

SendMetric(tmpl, Empty, fields, ts);
}
}

String InfluxdbWriter::EscapeKey(const String& str)
String InfluxdbWriter::EscapeKeyOrTagValue(const String& str)
{
// Iterate over the key name and escape commas and spaces with a backslash
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
boost::algorithm::replace_all(result, "=", "\\=");
boost::algorithm::replace_all(result, ",", "\\,");
boost::algorithm::replace_all(result, " ", "\\ ");

// InfluxDB 'feature': although backslashes are allowed in keys they also act
// as escape sequences when followed by ',' or ' '. When your tag is like
// 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
// and through experimentation they also escape '='. To be safe we replace
// trailing backslashes with and underscore.
size_t length = result.GetLength();
if (result[length - 1] == '\\')
result[length - 1] = '_';

return result;
}

String InfluxdbWriter::EscapeField(const String& str)
String InfluxdbWriter::EscapeValue(const Value& value)
{
//TODO: Evaluate whether boost::regex is really needed here.

// Handle integers
boost::regex integer("-?\\d+i");
if (boost::regex_match(str.GetData(), integer)) {
return str;
if (value.IsObjectType<InfluxdbInteger>()) {
std::ostringstream os;
os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue()
<< "i";
return os.str();
}

// Handle numerics
boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?");
if (boost::regex_match(str.GetData(), numeric)) {
return str;
}

// Handle booleans
boost::regex boolean_true("t|true", boost::regex::icase);
if (boost::regex_match(str.GetData(), boolean_true))
return "true";
boost::regex boolean_false("f|false", boost::regex::icase);
if (boost::regex_match(str.GetData(), boolean_false))
return "false";

// Handle NaNs
if (boost::math::isnan(str))
return 0;
if (value.IsBoolean())
return value ? "true" : "false";

// Otherwise it's a string and needs escaping and quoting
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
return "\"" + result + "\"";
return value;
}

void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts)
{
std::ostringstream msgbuf;
msgbuf << EscapeKey(tmpl->Get("measurement"));
msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));

Dictionary::Ptr tags = tmpl->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
// Empty macro expansion, no tag
if (!pair.second.IsEmpty()) {
msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second);
msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
}
}
}

// Label is may be empty in the case of metadata
// Label may be empty in the case of metadata
if (!label.IsEmpty())
msgbuf << ",metric=" << EscapeKey(label);
msgbuf << ",metric=" << EscapeKeyOrTagValue(label);

msgbuf << " ";

Expand All @@ -384,45 +361,54 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label
else
msgbuf << ",";

msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
}
}

msgbuf << " " << static_cast<unsigned long>(ts);

#ifdef I2_DEBUG
Log(LogDebug, "InfluxdbWriter")
<< "Add to metric list: '" << msgbuf.str() << "'.";
#endif /* I2_DEBUG */

// Atomically buffer the data point
boost::mutex::scoped_lock lock(m_DataBufferMutex);
m_DataBuffer.push_back(String(msgbuf.str()));
// Buffer the data point
m_DataBuffer.push_back(msgbuf.str());

// Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
Log(LogDebug, "InfluxdbWriter")
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush();

try {
Flush();
} catch (...) {
/* Do nothing. */
}
}
}

void InfluxdbWriter::FlushTimeout(void)
{
// Prevent new data points from being added to the array, there is a
// race condition where they could disappear
boost::mutex::scoped_lock lock(m_DataBufferMutex);
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushTimeoutWQ, this), PriorityHigh);
}

void InfluxdbWriter::FlushTimeoutWQ(void)
{
AssertOnWorkQueue();

// Flush if there are any data available
if (m_DataBuffer.size() > 0) {
Log(LogDebug, "InfluxdbWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
if (m_DataBuffer.empty())
return;

Log(LogDebug, "InfluxdbWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";

Flush();
}

void InfluxdbWriter::Flush(void)
{
// Ensure you hold a lock against m_DataBuffer so that things
// don't go missing after creating the body and clearing the buffer
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();

Expand Down Expand Up @@ -460,25 +446,27 @@ void InfluxdbWriter::Flush(void)
throw ex;
}

//TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
HttpResponse resp(stream, req);
StreamReadContext context;

try {
resp.Parse(context, true);
while (resp.Parse(context, true) && !resp.Complete)
; /* Do nothing */
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
throw ex;
}

if (resp.StatusCode != 204) {
if (!resp.Complete) {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected response code " << resp.StatusCode;
<< "Failed to read a complete HTTP response from the InfluxDB server.";
return;
}

// Finish parsing the headers and body
while (!resp.Complete)
resp.Parse(context, true);
if (resp.StatusCode != 204) {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected response code: " << resp.StatusCode;

String contentType = resp.Headers->Get("content-type");
if (contentType != "application/json") {
Expand All @@ -505,6 +493,8 @@ void InfluxdbWriter::Flush(void)

Log(LogCritical, "InfluxdbWriter")
<< "InfluxDB error message:\n" << error;

return;
}
}

Expand Down
13 changes: 4 additions & 9 deletions lib/perfdata/influxdbwriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <boost/thread/mutex.hpp>
#include <fstream>

namespace icinga
Expand Down Expand Up @@ -59,20 +58,16 @@ class InfluxdbWriter : public ObjectImpl<InfluxdbWriter>
WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
boost::mutex m_DataBufferMutex;

void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout(void);
void FlushTimeoutWQ(void);
void Flush(void);

static String FormatInteger(int val);
static String FormatBoolean(bool val);

static String EscapeKey(const String& str);
static String EscapeField(const String& str);
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);

Stream::Ptr Connect();

Expand Down