Skip to content

Commit

Permalink
refactor: replace otlp file exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
erichulburd committed Oct 4, 2024
1 parent cc021df commit 026cbd3
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 115 deletions.
51 changes: 4 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ serde = { version = "1.0.145", features = ["derive"] }
# When updating the OTEL version, be sure to update these optional dependencies in `tracing-subscriber` as well:
# * opentelemetry-otlp
# * opentelemetry-proto
# * opentelemetry-stdout
opentelemetry = "0.25.0"
opentelemetry_sdk = "0.25.0"
pyo3 = { version = "0.20.0", features = ["macros"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
//!
//! * `pyo3-opentelemetry-macros` - a crate defining the `pypropagate` macro.
//! * `pyo3-tracing-subscriber` - a crate supporting configuration and initialization of Rust
//! tracing subscribers from Python.
//! tracing subscribers from Python.
//!
//! # Examples
//!
Expand Down
7 changes: 4 additions & 3 deletions crates/tracing-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ opentelemetry = { workspace = true }
# since they are optional, they cannot be workspace dependencies.
opentelemetry-otlp = { version = "0.25.0", features = ["grpc-tonic", "trace", "tls-roots"], optional = true }
opentelemetry-proto = { version = "0.25.0", optional = true, features = ["tonic"] }
opentelemetry-stdout = { version = "0.25.0", optional = true, features = ["trace"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio", "rt-tokio-current-thread"] }
pyo3-asyncio = { workspace = true, features = ["tokio-runtime"], optional = true }
serde = { workspace = true }
Expand All @@ -28,10 +27,12 @@ tonic = { version = "0.12.3", features = ["tls", "tls-roots"], optional = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] }
futures-core = "0.3.30"
serde_json = "1.0.128"

[features]
layer-otel-otlp = ["dep:opentelemetry-otlp", "dep:opentelemetry-proto", "dep:tonic"]
layer-otel-otlp-file = ["dep:opentelemetry-stdout"]
layer-otel-otlp = ["layer-otel-otlp-file", "dep:tonic"]
layer-otel-otlp-file = ["dep:opentelemetry-otlp", "dep:opentelemetry-proto"]
stubs = ["dep:handlebars"]
pyo3 = ["dep:pyo3", "dep:pyo3-asyncio"]
extension-module = ["pyo3", "pyo3/extension-module"]
Expand Down
2 changes: 2 additions & 0 deletions crates/tracing-subscriber/src/contextmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ mod test {
let layer_config = Box::new(crate::layers::otel_otlp_file::Config {
file_path: Some(temporary_file_path.as_os_str().to_str().unwrap().to_owned()),
filter: Some("error,pyo3_tracing_subscriber=info".to_string()),
instrumentation_library: None,
});
let subscriber = Box::new(TracingSubscriberRegistryConfig { layer_config });
let config = TracingConfig::Global(GlobalTracingConfig {
Expand Down Expand Up @@ -319,6 +320,7 @@ mod test {
let layer_config = Box::new(crate::layers::otel_otlp_file::Config {
file_path: Some(temporary_file_path.as_os_str().to_str().unwrap().to_owned()),
filter: Some("error,pyo3_tracing_subscriber=info".to_string()),
instrumentation_library: None,
});
let subscriber = Box::new(TracingSubscriberRegistryConfig { layer_config });
let config = TracingConfig::CurrentThread(CurrentThreadTracingConfig {
Expand Down
2 changes: 1 addition & 1 deletion crates/tracing-subscriber/src/export_process/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct ExportProcess {
}

impl ExportProcess {
fn new(guard: SubscriberManagerGuard, runtime: Runtime) -> Self {
const fn new(guard: SubscriberManagerGuard, runtime: Runtime) -> Self {
Self { runtime, guard }
}

Expand Down
57 changes: 55 additions & 2 deletions crates/tracing-subscriber/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
//!
//! * [`crate::layers::fmt_file::Config`] - a layer which writes spans to a file (or stdout) in
//! * [`crate::layers::otel_otlp_file::Config`] - a layer which writes spans to a file (or stdout) in
//! the `OpenTelemetry` OTLP JSON-serialized format.
//! the `OpenTelemetry` OTLP JSON-serialized format.
//! * [`crate::layers::otel_otlp::Config`] - a layer which exports spans to an `OpenTelemetry` collector.
pub(crate) mod fmt_file;
#[cfg(feature = "layer-otel-otlp")]
pub(crate) mod otel_otlp;
#[cfg(feature = "layer-otel-otlp-file")]
pub(crate) mod otel_otlp_file;

use std::fmt::Debug;
use std::{borrow::Cow, collections::HashMap, fmt::Debug};

use opentelemetry::InstrumentationLibrary;
use pyo3::prelude::*;
use tracing_subscriber::{
filter::{FromEnvError, ParseError},
Expand Down Expand Up @@ -210,3 +211,55 @@ pub(crate) fn init_submodule(name: &str, py: Python, m: &PyModule) -> PyResult<(

Ok(())
}

#[pyclass(name = "InstrumentationLibrary")]
#[derive(Debug, Clone)]
pub(crate) struct PyInstrumentationLibrary {
name: String,
version: Option<String>,
schema_url: Option<String>,
attributes: HashMap<String, String>,
}

#[pymethods]
impl PyInstrumentationLibrary {
#[new]
#[pyo3(signature = (name, /, version=None, schema_url=None, attributes=None))]
fn new(
name: String,
version: Option<String>,
schema_url: Option<String>,
attributes: Option<HashMap<String, String>>,
) -> Self {
let attributes = attributes.unwrap_or_default();
Self {
name,
version,
schema_url,
attributes,
}
}
}

impl From<PyInstrumentationLibrary> for InstrumentationLibrary {
fn from(py_instrumentation_library: PyInstrumentationLibrary) -> Self {
let mut builder = Self::builder(Cow::from(py_instrumentation_library.name));
if let Some(version) = py_instrumentation_library.version {
builder = builder.with_version(Cow::from(version));
}
if let Some(schema_url) = py_instrumentation_library.schema_url {
builder = builder.with_schema_url(Cow::from(schema_url));
}
let mut attributes = Vec::new();
for (key, value) in py_instrumentation_library.attributes {
let kv = opentelemetry::KeyValue::new(
opentelemetry::Key::new(key),
opentelemetry::Value::from(value),
);
attributes.push(kv);
}
builder = builder.with_attributes(attributes);

builder.build()
}
}
40 changes: 26 additions & 14 deletions crates/tracing-subscriber/src/layers/otel_otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use opentelemetry::{trace::TracerProvider, InstrumentationLibrary};
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::{
trace::{Sampler, SpanLimits},
Resource,
};
use pyo3::prelude::*;
use tracing_subscriber::Layer;

use crate::create_init_submodule;
use opentelemetry_sdk::trace;
use tonic::metadata::{
errors::{InvalidMetadataKey, InvalidMetadataValue},
MetadataKey,
};
use tracing_subscriber::{
filter::{FromEnvError, ParseError},
Layer,
};
use tracing_subscriber::filter::{FromEnvError, ParseError};

use super::{build_env_filter, force_flush_provider_as_shutdown, LayerBuildResult, WithShutdown};
use super::{
build_env_filter, force_flush_provider_as_shutdown, LayerBuildResult, PyInstrumentationLibrary,
WithShutdown,
};

/// Configures the [`opentelemetry-otlp`] crate layer.
#[derive(Clone, Debug)]
Expand All @@ -56,6 +58,8 @@ pub(crate) struct Config {
pre_shutdown_timeout: Duration,
/// The filter to use for the [`tracing_subscriber::filter::EnvFilter`] layer.
filter: Option<String>,
/// The instrumentation library to use for the [`opentelemetry::sdk::trace::TracerProvider`].
instrumentation_library: Option<InstrumentationLibrary>,
}

impl Config {
Expand Down Expand Up @@ -95,22 +99,27 @@ impl Config {
.tracing()
.with_exporter(self.initialize_otlp_exporter())
.with_trace_config(
trace::config()
trace::Config::default()
.with_sampler(self.sampler.clone())
.with_span_limits(self.span_limits)
.with_resource(self.resource.clone()),
);

let tracer = if batch {
let provider = if batch {
pipeline.install_batch(opentelemetry_sdk::runtime::Tokio {})
} else {
pipeline.install_simple()
}
.map_err(BuildError::from)?;
let provider = tracer
.provider()
.ok_or(BuildError::ProviderNotSetOnTracer)?;
let env_filter = build_env_filter(self.filter.clone())?;

let tracer = self.instrumentation_library.as_ref().map_or_else(
|| provider.tracer("pyo3_tracing_subscriber"),
|instrumentation_library| {
provider.library_tracer(Arc::new(instrumentation_library.clone()))
},
);

let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(env_filter);
Expand All @@ -131,8 +140,6 @@ pub(super) enum Error {
pub(crate) enum BuildError {
#[error("failed to build opentelemetry-otlp pipeline: {0}")]
TraceInstall(#[from] opentelemetry::trace::TraceError),
#[error("provider not set on returned opentelemetry-otlp tracer")]
ProviderNotSetOnTracer,
#[error("error in the configuration: {0}")]
Config(#[from] ConfigError),
#[error("failed to parse specified trace filter: {0}")]
Expand Down Expand Up @@ -239,6 +246,7 @@ pub(crate) struct PyConfig {
timeout_millis: Option<u64>,
pre_shutdown_timeout_millis: u64,
filter: Option<String>,
instrumentation_library: Option<PyInstrumentationLibrary>,
}

#[pymethods]
Expand All @@ -253,7 +261,8 @@ impl PyConfig {
endpoint = None,
timeout_millis = None,
pre_shutdown_timeout_millis = 2000,
filter = None
filter = None,
instrumentation_library = None
))]
#[allow(clippy::too_many_arguments)]
fn new(
Expand All @@ -265,6 +274,7 @@ impl PyConfig {
timeout_millis: Option<u64>,
pre_shutdown_timeout_millis: u64,
filter: Option<&str>,
instrumentation_library: Option<PyInstrumentationLibrary>,
) -> PyResult<Self> {
Ok(Self {
span_limits: span_limits.unwrap_or_default(),
Expand All @@ -275,6 +285,7 @@ impl PyConfig {
timeout_millis,
pre_shutdown_timeout_millis,
filter: filter.map(String::from),
instrumentation_library,
})
}
}
Expand Down Expand Up @@ -447,6 +458,7 @@ impl TryFrom<PyConfig> for Config {
timeout: config.timeout_millis.map(Duration::from_millis),
pre_shutdown_timeout: Duration::from_millis(config.pre_shutdown_timeout_millis),
filter: config.filter,
instrumentation_library: config.instrumentation_library.map(Into::into),
})
}
}
Expand Down
Loading

0 comments on commit 026cbd3

Please sign in to comment.