Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing the type of callback function in Export function to std::function #1278

Merged
merged 19 commits into from
Mar 22, 2022
Merged
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[WIP] Added changes for async callback mechanism from Processor to Ex…
…porters
DebajitDas committed Mar 9, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 29b3db774e3efacb89e98638f6e6bfe9e2bfed26
Original file line number Diff line number Diff line change
@@ -89,6 +89,14 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

/**
*
*
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback) noexcept override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
124 changes: 122 additions & 2 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
@@ -110,6 +110,82 @@ class ResponseHandler : public http_client::EventHandler
bool console_debug_ = false;
};


/**
* This class handles the async response message from the Elasticsearch request
*/
class AsyncResponseHandler : public http_client::EventHandler
{
public:
/**
* Creates a response handler, that by default doesn't display to console
*/
AsyncResponseHandler(
std::shared_ptr<ext::http::client::Session> session,
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback,
bool console_debug = false)
: console_debug_{console_debug}
, session_{std::move(session)}
, result_callback_{result_callback} {}

/**
* Automatically called when the response is received
*/
void OnResponse(http_client::Response &response) noexcept override
{

// Store the body of the request
body_ = std::string(response.GetBody().begin(), response.GetBody().end());
session_->FinishSession();
if (body_.find("\"failed\" : 0") == std::string::npos)
{
OTEL_INTERNAL_LOG_ERROR(
"[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: "
<< body_);
result_callback_(sdk::common::ExportResult::kFailure);
} else {
result_callback_(sdk::common::ExportResult::kSuccess);
}
}

// Callback method when an http event occurs
void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
{
// If any failure event occurs, release the condition variable to unblock main thread
switch (state)
{
case http_client::SessionState::ConnectFailed:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed");
break;
case http_client::SessionState::SendFailed:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request failed to be sent to elasticsearch");

break;
case http_client::SessionState::TimedOut:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request to elasticsearch timed out");

break;
case http_client::SessionState::NetworkError:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch");
break;
}
result_callback_(sdk::common::ExportResult::kFailure);
}

private:
// Stores the session object for the request
std::shared_ptr<ext::http::client::Session> session_;
// Callback to call to on receiving events
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback_;

// A string to store the response body
std::string body_ = "";

// Whether to print the results from the callback
bool console_debug_ = false;
};


ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
http_client_{new ext::http::client::curl::HttpClient()}
@@ -162,8 +238,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
request->SetBody(body_vec);

// Send the request
std::unique_ptr<ResponseHandler> handler(new ResponseHandler(options_.console_debug_));
session->SendRequest(*handler);
auto handler = std::make_shared<ResponseHandler>(options_.console_debug_);
session->SendRequest(handler);

// Wait for the response to be received
if (options_.console_debug_)
@@ -198,6 +274,50 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback) noexcept
{
// Return failure if this exporter has been shutdown
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return;
}

// Create a connection to the ElasticSearch instance
auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_));
auto request = session->CreateRequest();

// Populate the request with headers and methods
request->SetUri(options_.index_ + "/_bulk?pretty");
request->SetMethod(http_client::Method::Post);
request->AddHeader("Content-Type", "application/json");
request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_));

// Create the request body
std::string body = "";
for (auto &record : records)
{
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified
// in URI
body += "{\"index\" : {}}\n";

// Add the context of the Recordable
auto json_record = std::unique_ptr<ElasticSearchRecordable>(
static_cast<ElasticSearchRecordable *>(record.release()));
body += json_record->GetJSON().dump() + "\n";
}
std::vector<uint8_t> body_vec(body.begin(), body.end());
request->SetBody(body_vec);

// Send the request
auto handler = std::make_shared<AsyncResponseHandler>(
session, result_callback, options_.console_debug_);
session->SendRequest(handler);
}

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Original file line number Diff line number Diff line change
@@ -64,6 +64,20 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

/**
*
*
*/
void Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback)
noexcept override
{
auto result = Export(spans);
result_callback(result);

}

/**
* @param timeout an optional value containing the timeout of the exporter
* note: passing custom timeout values is not currently supported for this exporter
Original file line number Diff line number Diff line change
@@ -39,6 +39,14 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback)
noexcept;

/**
* Marks the OStream Log Exporter as shut down.
*/
Original file line number Diff line number Diff line change
@@ -38,6 +38,11 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback)
noexcept override;

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

10 changes: 10 additions & 0 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
@@ -180,6 +180,16 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback)
noexcept
{
// Do not have async support
auto result = Export(records);
result_callback(result);
}

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
8 changes: 8 additions & 0 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
@@ -96,6 +96,14 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Original file line number Diff line number Diff line change
@@ -78,6 +78,15 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

/**
*
*
*/
void Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback)
noexcept override;

/**
* Shut down the exporter.
* @param timeout an optional timeout, default to max.
8 changes: 8 additions & 0 deletions exporters/zipkin/src/zipkin_exporter.cc
Original file line number Diff line number Diff line change
@@ -93,6 +93,14 @@ sdk::common::ExportResult ZipkinExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void ZipkinExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback)
noexcept
{

}

void ZipkinExporter::InitializeLocalEndpoint()
{
if (options_.service_name.length())
Original file line number Diff line number Diff line change
@@ -143,19 +143,19 @@ class Session : public opentelemetry::ext::http::client::Session
}

virtual void SendRequest(
opentelemetry::ext::http::client::EventHandler &callback) noexcept override
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept override
{
is_session_active_ = true;
std::string url = host_ + std::string(http_request_->uri_);
auto callback_ptr = &callback;
auto callback_ptr = callback.get();
curl_operation_.reset(new HttpOperation(
http_request_->method_, url, callback_ptr, RequestMode::Async, http_request_->headers_,
http_request_->body_, false, http_request_->timeout_ms_));
curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) {
curl_operation_->SendAsync([this, callback](HttpOperation &operation) {
if (operation.WasAborted())
{
// Manually cancelled
callback_ptr->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, "");
callback->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, "");
}

if (operation.GetResponseCode() >= CURL_LAST)
@@ -165,7 +165,7 @@ class Session : public opentelemetry::ext::http::client::Session
response->headers_ = operation.GetResponseHeaders();
response->body_ = operation.GetResponseBody();
response->status_code_ = operation.GetResponseCode();
callback_ptr->OnResponse(*response);
callback->OnResponse(*response);
}
is_session_active_ = false;
});
2 changes: 1 addition & 1 deletion ext/include/opentelemetry/ext/http/client/http_client.h
Original file line number Diff line number Diff line change
@@ -212,7 +212,7 @@ class Session
public:
virtual std::shared_ptr<Request> CreateRequest() noexcept = 0;

virtual void SendRequest(EventHandler &) noexcept = 0;
virtual void SendRequest(std::shared_ptr<EventHandler>) noexcept = 0;

virtual bool IsSessionActive() noexcept = 0;

Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ class Session : public opentelemetry::ext::http::client::Session

MOCK_METHOD(void,
SendRequest,
(opentelemetry::ext::http::client::EventHandler &),
(std::shared_ptr<opentelemetry::ext::http::client::EventHandler>),
(noexcept, override));

virtual bool CancelSession() noexcept override;
15 changes: 6 additions & 9 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
@@ -196,12 +196,11 @@ TEST_F(BasicCurlHttpTests, SendGetRequest)
auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("get/");
GetEventHandler *handler = new GetEventHandler();
session->SendRequest(*handler);
auto handler = std::make_shared<GetEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
delete handler;
}

TEST_F(BasicCurlHttpTests, SendPostRequest)
@@ -219,16 +218,15 @@ TEST_F(BasicCurlHttpTests, SendPostRequest)
http_client::Body body = {b, b + strlen(b)};
request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
PostEventHandler *handler = new PostEventHandler();
session->SendRequest(*handler);
auto handler = std::make_shared<PostEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();

delete handler;
}

TEST_F(BasicCurlHttpTests, RequestTimeout)
@@ -240,11 +238,10 @@ TEST_F(BasicCurlHttpTests, RequestTimeout)
auto session = session_manager->CreateSession("222.222.222.200:19000"); // Non Existing address
auto request = session->CreateRequest();
request->SetUri("get/");
GetEventHandler *handler = new GetEventHandler();
session->SendRequest(*handler);
auto handler = std::make_shared<GetEventHandler>();
session->SendRequest(handler);
session->FinishSession();
ASSERT_FALSE(handler->is_called_);
delete handler;
}

TEST_F(BasicCurlHttpTests, CurlHttpOperations)
11 changes: 11 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/exporter.h
Original file line number Diff line number Diff line change
@@ -46,6 +46,17 @@ class LogExporter
virtual sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<Recordable>> &records) noexcept = 0;


/**
* Exports the batch of log records to their export destination
*
*
*
*/
virtual void Export(
const nostd::span<std::unique_ptr<Recordable>> &records,
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback) noexcept = 0;

/**
* Marks the exporter as ShutDown and cleans up any resources as required.
* Shutdown should be called only once for each Exporter instance.
Loading