Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronized calls to Exporter::Export & Shutdown #1164

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifdef ENABLE_LOGS_PREVIEW

# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/curl/http_client_curl.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
Expand Down Expand Up @@ -104,6 +105,8 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::HttpClient> http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
14 changes: 11 additions & 3 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# include <sstream> // std::stringstream

# include <mutex>
# include "opentelemetry/exporters/elasticsearch/es_log_exporter.h"
# include "opentelemetry/exporters/elasticsearch/es_log_recordable.h"
# include "opentelemetry/sdk_config.h"
Expand Down Expand Up @@ -127,10 +128,10 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
// Return failure if this exporter has been shutdown
if (is_shutdown_)
if (isShutdown())
{

OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Export failed, exporter is shutdown");
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -199,6 +200,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;

// Shutdown the session manager
Expand All @@ -207,6 +209,12 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc

return true;
}

bool ElasticsearchLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <opentelemetry/common/spin_lock_mutex.h>
#include <opentelemetry/ext/http/client/http_client.h>
#include <opentelemetry/sdk/trace/exporter.h>

Expand Down Expand Up @@ -75,6 +76,8 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
bool is_shutdown_ = false;
JaegerExporterOptions options_;
std::unique_ptr<ThriftSender> sender_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
// For testing
friend class JaegerExporterTestPeer;
/**
Expand Down
13 changes: 12 additions & 1 deletion exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#include <agent_types.h>
#include <opentelemetry/exporters/jaeger/jaeger_exporter.h>
#include <opentelemetry/exporters/jaeger/recordable.h>
#include "opentelemetry/sdk_config.h"

#include "http_transport.h"
#include "thrift_sender.h"
#include "udp_transport.h"

#include <mutex>
#include <vector>

namespace sdk_common = opentelemetry::sdk::common;
Expand Down Expand Up @@ -39,8 +41,10 @@ std::unique_ptr<trace_sdk::Recordable> JaegerExporter::MakeRecordable() noexcept
sdk_common::ExportResult JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
Copy link
Member

@lalitb lalitb Jan 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - not directly related to this PR, but while you are changing this piece of code, could you also add error logging wherever missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
OTEL_INTERNAL_LOG_ERROR("[Jaeger Trace Exporter] Exporting "
<< spans.size() << " span(s) failed, exporter is shutdown");
return sdk_common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -91,10 +95,17 @@ void JaegerExporter::InitializeEndpoint()

bool JaegerExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool JaegerExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once
#include <mutex>
#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/exporters/memory/in_memory_span_data.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk_config.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -42,8 +45,10 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[In Memory Span Exporter] Exporting "
<< recordables.size() << " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
for (auto &recordable : recordables)
Expand All @@ -67,6 +72,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
};
Expand All @@ -82,6 +88,12 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
private:
std::shared_ptr<opentelemetry::exporter::memory::InMemorySpanData> data_;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
};
} // namespace memory
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#ifdef ENABLE_LOGS_PREVIEW

# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/log_record.h"
Expand Down Expand Up @@ -49,6 +50,8 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
std::ostream &sout_;
// Whether this exporter has been shut down
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/type_traits.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/span_data.h"
Expand Down Expand Up @@ -42,7 +43,9 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter

private:
std::ostream &sout_;
bool isShutdown_ = false;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

// Mapping status number to the string from api/include/opentelemetry/trace/canonical_code.h
std::map<int, std::string> statusMap{{0, "Unset"}, {1, "Ok"}, {2, "Error"}};
Expand Down
13 changes: 12 additions & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#ifdef ENABLE_LOGS_PREVIEW
# include "opentelemetry/exporters/ostream/log_exporter.h"
# include <mutex>
# include "opentelemetry/sdk_config.h"

# include <iostream>

Expand Down Expand Up @@ -107,8 +109,10 @@ std::unique_ptr<sdklogs::Recordable> OStreamLogExporter::MakeRecordable() noexce
sdk::common::ExportResult OStreamLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[Ostream Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -168,10 +172,17 @@ sdk::common::ExportResult OStreamLogExporter::Export(

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OStreamLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
15 changes: 12 additions & 3 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/ostream/span_exporter.h"

#include <iostream>
#include <mutex>
#include "opentelemetry/sdk_config.h"

namespace nostd = opentelemetry::nostd;
namespace trace_sdk = opentelemetry::sdk::trace;
Expand Down Expand Up @@ -44,8 +45,10 @@ std::unique_ptr<trace_sdk::Recordable> OStreamSpanExporter::MakeRecordable() noe
sdk::common::ExportResult OStreamSpanExporter::Export(
const nostd::span<std::unique_ptr<trace_sdk::Recordable>> &spans) noexcept
{
if (isShutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[Ostream Trace Exporter] Exporting "
<< spans.size() << " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -95,10 +98,16 @@ sdk::common::ExportResult OStreamSpanExporter::Export(

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
isShutdown_ = true;
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OStreamSpanExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
void OStreamSpanExporter::printAttributes(
const std::unordered_map<std::string, sdkcommon::OwnedAttributeValue> &map,
const std::string prefix)
Expand Down
5 changes: 3 additions & 2 deletions exporters/ostream/test/ostream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ TEST(OStreamLogExporter, Shutdown)

// Restore original stringstream buffer
std::cout.rdbuf(original);

ASSERT_EQ(output.str(), "");
std::string err_message =
"[Ostream Log Exporter] Exporting 1 log(s) failed, exporter is shutdown";
EXPECT_TRUE(output.str().find(err_message) != std::string::npos);
}

// ---------------------------------- Print to cout -------------------------
Expand Down
5 changes: 3 additions & 2 deletions exporters/ostream/test/ostream_span_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ TEST(OStreamSpanExporter, Shutdown)
EXPECT_TRUE(processor->Shutdown());
processor->OnEnd(std::move(recordable));
});

EXPECT_EQ(captured, "");
std::string err_message =
"[Ostream Trace Exporter] Exporting 1 span(s) failed, exporter is shutdown";
EXPECT_TRUE(captured.find(err_message) != std::string::npos);
}

constexpr const char *kDefaultSpanPrinted =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
Expand Down Expand Up @@ -77,6 +78,8 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
*/
OtlpGrpcExporter(std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
# include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

// clang-format on
Expand Down Expand Up @@ -78,6 +79,8 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
*/
OtlpGrpcLogExporter(std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/sdk/common/exporter_utils.h"

Expand Down Expand Up @@ -124,6 +125,8 @@ class OtlpHttpClient
std::shared_ptr<ext::http::client::HttpClient> http_client_;
// Cached parsed URI
std::string http_uri_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
13 changes: 11 additions & 2 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h"
#include <mutex>
#include "opentelemetry/exporters/otlp/otlp_recordable.h"
#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h"
#include "opentelemetry/ext/http/common/url_parser.h"
Expand Down Expand Up @@ -103,9 +104,10 @@ std::unique_ptr<sdk::trace::Recordable> OtlpGrpcExporter::MakeRecordable() noexc
sdk::common::ExportResult OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC] Export failed, exporter is shutdown");
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC] Exporting " << spans.size()
<< " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
proto::collector::trace::v1::ExportTraceServiceRequest request;
Expand Down Expand Up @@ -138,10 +140,17 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OtlpGrpcExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Loading