Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronized calls to Exporter::Export & Shutdown #1164

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifdef ENABLE_LOGS_PREVIEW

# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/curl/http_client_curl.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
Expand Down Expand Up @@ -104,6 +105,8 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::HttpClient> http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
10 changes: 9 additions & 1 deletion exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# include <sstream> // std::stringstream

# include <mutex>
# include "opentelemetry/exporters/elasticsearch/es_log_exporter.h"
# include "opentelemetry/exporters/elasticsearch/es_log_recordable.h"
# include "opentelemetry/sdk_config.h"
Expand Down Expand Up @@ -127,7 +128,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
// Return failure if this exporter has been shutdown
if (is_shutdown_)
if (isShutdown())
{

OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Export failed, exporter is shutdown");
Expand Down Expand Up @@ -199,6 +200,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;

// Shutdown the session manager
Expand All @@ -207,6 +209,12 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc

return true;
}

const bool ElasticsearchLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <opentelemetry/ext/http/client/http_client.h>
#include <opentelemetry/sdk/trace/exporter.h>
#include "opentelemetry/common/spin_lock_mutex.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use #include <> for consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -75,6 +76,8 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
bool is_shutdown_ = false;
JaegerExporterOptions options_;
std::unique_ptr<ThriftSender> sender_;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
// For testing
friend class JaegerExporterTestPeer;
/**
Expand Down
10 changes: 9 additions & 1 deletion exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "thrift_sender.h"
#include "udp_transport.h"

#include <mutex>
#include <vector>

namespace sdk_common = opentelemetry::sdk::common;
Expand Down Expand Up @@ -39,7 +40,7 @@ std::unique_ptr<trace_sdk::Recordable> JaegerExporter::MakeRecordable() noexcept
sdk_common::ExportResult JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
Copy link
Member

@lalitb lalitb Jan 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - not directly related to this PR, but while you are changing this piece of code, could you also add error logging wherever missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
return sdk_common::ExportResult::kFailure;
}
Expand Down Expand Up @@ -91,10 +92,17 @@ void JaegerExporter::InitializeEndpoint()

bool JaegerExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

const bool JaegerExporter::isShutdown() const noexcept
ThomsonTan marked this conversation as resolved.
Show resolved Hide resolved
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once
#include <mutex>
#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/exporters/memory/in_memory_span_data.h"
#include "opentelemetry/sdk/trace/exporter.h"

Expand Down Expand Up @@ -42,7 +44,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
{
if (is_shutdown_)
if (isShutdown())
{
return sdk::common::ExportResult::kFailure;
}
Expand All @@ -67,6 +69,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
};
Expand All @@ -82,6 +85,12 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
private:
std::shared_ptr<opentelemetry::exporter::memory::InMemorySpanData> data_;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
};
} // namespace memory
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#ifdef ENABLE_LOGS_PREVIEW

# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/log_record.h"
Expand Down Expand Up @@ -49,6 +50,8 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
std::ostream &sout_;
// Whether this exporter has been shut down
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/type_traits.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/span_data.h"
Expand Down Expand Up @@ -42,7 +43,9 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter

private:
std::ostream &sout_;
bool isShutdown_ = false;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;

// Mapping status number to the string from api/include/opentelemetry/trace/canonical_code.h
std::map<int, std::string> statusMap{{0, "Unset"}, {1, "Ok"}, {2, "Error"}};
Expand Down
10 changes: 9 additions & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#ifdef ENABLE_LOGS_PREVIEW
# include "opentelemetry/exporters/ostream/log_exporter.h"
# include <mutex>

# include <iostream>

Expand Down Expand Up @@ -107,7 +108,7 @@ std::unique_ptr<sdklogs::Recordable> OStreamLogExporter::MakeRecordable() noexce
sdk::common::ExportResult OStreamLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
return sdk::common::ExportResult::kFailure;
}
Expand Down Expand Up @@ -168,10 +169,17 @@ sdk::common::ExportResult OStreamLogExporter::Export(

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

const bool OStreamLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
12 changes: 9 additions & 3 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/ostream/span_exporter.h"

#include <iostream>
#include <mutex>

namespace nostd = opentelemetry::nostd;
namespace trace_sdk = opentelemetry::sdk::trace;
Expand Down Expand Up @@ -44,7 +44,7 @@ std::unique_ptr<trace_sdk::Recordable> OStreamSpanExporter::MakeRecordable() noe
sdk::common::ExportResult OStreamSpanExporter::Export(
const nostd::span<std::unique_ptr<trace_sdk::Recordable>> &spans) noexcept
{
if (isShutdown_)
if (isShutdown())
{
return sdk::common::ExportResult::kFailure;
}
Expand Down Expand Up @@ -95,10 +95,16 @@ sdk::common::ExportResult OStreamSpanExporter::Export(

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
isShutdown_ = true;
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

const bool OStreamSpanExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
void OStreamSpanExporter::printAttributes(
const std::unordered_map<std::string, sdkcommon::OwnedAttributeValue> &map,
const std::string prefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
Expand Down Expand Up @@ -77,6 +78,8 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
*/
OtlpGrpcExporter(std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
# include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

// clang-format on
Expand Down Expand Up @@ -78,6 +79,8 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
*/
OtlpGrpcLogExporter(std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/sdk/common/exporter_utils.h"

Expand Down Expand Up @@ -124,6 +125,8 @@ class OtlpHttpClient
std::shared_ptr<ext::http::client::HttpClient> http_client_;
// Cached parsed URI
std::string http_uri_;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
10 changes: 9 additions & 1 deletion exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h"
#include <mutex>
#include "opentelemetry/exporters/otlp/otlp_recordable.h"
#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h"
#include "opentelemetry/ext/http/common/url_parser.h"
Expand Down Expand Up @@ -103,7 +104,7 @@ std::unique_ptr<sdk::trace::Recordable> OtlpGrpcExporter::MakeRecordable() noexc
sdk::common::ExportResult OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC] Export failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
Expand Down Expand Up @@ -138,10 +139,17 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

const bool OtlpGrpcExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
9 changes: 8 additions & 1 deletion exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ std::unique_ptr<opentelemetry::sdk::logs::Recordable> OtlpGrpcLogExporter::MakeR
opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC log] Export failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
Expand Down Expand Up @@ -157,10 +157,17 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(

bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

const bool OtlpGrpcLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Loading