Skip to content

Commit

Permalink
Add compression support for otlp tonic (#1165)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrynCooke authored Jul 29, 2023
1 parent ba6f924 commit 58ad33a
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 35 deletions.
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added
- Add OTLP HTTP Metrics Exporter [#1020](https://github.com/open-telemetry/opentelemetry-rust/pull/1020).
- Add tonic compression support [#1165](https://github.com/open-telemetry/opentelemetry-rust/pull/1165).

## v0.12.0

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ default = ["grpc-tonic", "trace"]

# grpc using tonic
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"]
gzip-tonic = ["tonic/gzip"]
tls = ["tonic/tls"]
tls-roots = ["tls", "tonic/tls-roots"]

Expand Down
9 changes: 1 addition & 8 deletions opentelemetry-otlp/src/exporter/grpcio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::exporter::Compression;
use crate::ExportConfig;
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -47,14 +48,6 @@ pub struct Credentials {
pub key: String,
}

/// The compression algorithm to use when sending data.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Copy, Debug)]
pub enum Compression {
/// Compresses data using gzip.
Gzip,
}

impl From<Compression> for grpcio::CompressionAlgorithms {
fn from(compression: Compression) -> Self {
match compression {
Expand Down
49 changes: 47 additions & 2 deletions opentelemetry-otlp/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use crate::exporter::grpcio::GrpcioExporterBuilder;
use crate::exporter::http::HttpExporterBuilder;
#[cfg(feature = "grpc-tonic")]
use crate::exporter::tonic::TonicExporterBuilder;
use crate::Protocol;
use crate::{Error, Protocol};
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -21,6 +24,8 @@ pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT;
/// Protocol the exporter will use. Either `http/protobuf` or `grpc`.
pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION";

#[cfg(feature = "http-proto")]
/// Default protocol, using http-proto.
Expand Down Expand Up @@ -79,6 +84,33 @@ impl Default for ExportConfig {
}
}

/// The compression algorithm to use when sending data.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Compression {
/// Compresses data using gzip.
Gzip,
}

impl Display for Compression {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Compression::Gzip => write!(f, "gzip"),
}
}
}

impl FromStr for Compression {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"gzip" => Ok(Compression::Gzip),
_ => Err(Error::UnsupportedCompressionAlgorithm(s.to_string())),
}
}
}

/// default protocol based on enabled features
fn default_protocol() -> Protocol {
match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT {
Expand Down Expand Up @@ -217,13 +249,15 @@ impl<B: HasExportConfig> WithExportConfig for B {
mod tests {
// If an env test fails then the mutex will be poisoned and the following error will be displayed.
const LOCK_POISONED_MESSAGE: &str = "one of the other pipeline builder from env tests failed";

use crate::exporter::{
default_endpoint, default_protocol, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
OTEL_EXPORTER_OTLP_PROTOCOL_GRPC, OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF,
OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
};
use crate::{new_exporter, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL};
use crate::{new_exporter, Compression, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL};
use std::str::FromStr;
use std::sync::Mutex;

// Make sure env tests are not running concurrently
Expand Down Expand Up @@ -345,4 +379,15 @@ mod tests {
std::env::remove_var(OTEL_EXPORTER_OTLP_TIMEOUT);
assert!(std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT).is_err());
}

#[test]
fn test_compression_parse() {
assert_eq!(Compression::from_str("gzip").unwrap(), Compression::Gzip);
Compression::from_str("bad_compression").expect_err("bad compression");
}

#[test]
fn test_compression_to_str() {
assert_eq!(Compression::Gzip.to_string(), "gzip");
}
}
56 changes: 55 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::ExportConfig;
use crate::exporter::Compression;
use crate::{ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION};
use std::fmt::{Debug, Formatter};
use tonic::codec::CompressionEncoding;
use tonic::metadata::MetadataMap;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;
Expand All @@ -18,6 +20,39 @@ pub struct TonicConfig {
/// TLS settings for the collector endpoint.
#[cfg(feature = "tls")]
pub tls_config: Option<ClientTlsConfig>,

/// The compression algorithm to use when communicating with the collector.
pub compression: Option<Compression>,
}

impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
type Error = crate::Error;

fn try_from(value: Compression) -> Result<Self, Self::Error> {
match value {
#[cfg(feature = "gzip-tonic")]
Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
#[cfg(not(feature = "gzip-tonic"))]
Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm(
value.to_string(),
)),
}
}
}

pub(crate) fn resolve_compression(
tonic_config: &TonicConfig,
env_override: &'static str,
) -> Result<Option<CompressionEncoding>, crate::Error> {
if let Some(compression) = tonic_config.compression {
Ok(Some(compression.try_into()?))
} else if let Ok(compression) = std::env::var(env_override) {
Ok(Some(compression.parse::<Compression>()?.try_into()?))
} else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
Ok(Some(compression.parse::<Compression>()?.try_into()?))
} else {
Ok(None)
}
}

/// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol.
Expand Down Expand Up @@ -60,6 +95,7 @@ impl Default for TonicExporterBuilder {
)),
#[cfg(feature = "tls")]
tls_config: None,
compression: None,
};

TonicExporterBuilder {
Expand Down Expand Up @@ -94,6 +130,12 @@ impl TonicExporterBuilder {
self
}

/// Set the compression algorithm to use when communicating with the collector.
pub fn with_compression(mut self, compression: Compression) -> Self {
self.tonic_config.compression = Some(compression);
self
}

/// Use `channel` as tonic's transport channel.
/// this will override tls config and should only be used
/// when working with non-HTTP transports.
Expand All @@ -119,6 +161,8 @@ impl TonicExporterBuilder {

#[cfg(test)]
mod tests {
#[cfg(feature = "gzip-tonic")]
use crate::exporter::Compression;
use crate::TonicExporterBuilder;
use tonic::metadata::{MetadataMap, MetadataValue};

Expand Down Expand Up @@ -151,4 +195,14 @@ mod tests {
.len()
);
}

#[test]
#[cfg(feature = "gzip-tonic")]
fn test_with_compression() {
// metadata should merge with the current one with priority instead of just replacing it
let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}
}
20 changes: 14 additions & 6 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,28 @@ mod metric;
mod span;
mod transform;

pub use crate::exporter::Compression;
pub use crate::exporter::ExportConfig;
#[cfg(feature = "trace")]
pub use crate::span::{
OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
};

#[cfg(feature = "metrics")]
pub use crate::metric::{
MetricsExporter, MetricsExporterBuilder, OtlpMetricPipeline,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
};

#[cfg(feature = "logs")]
pub use crate::logs::*;
pub use crate::logs::{
LogExporter, LogExporterBuilder, OtlpLogPipeline, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
};

pub use crate::exporter::{
HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT, OTEL_EXPORTER_OTLP_TIMEOUT,
OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
Expand All @@ -217,7 +221,7 @@ use opentelemetry_sdk::export::ExportError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(feature = "grpc-sys")]
pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioConfig, GrpcioExporterBuilder};
pub use crate::exporter::grpcio::{Credentials, GrpcioConfig, GrpcioExporterBuilder};
#[cfg(feature = "http-proto")]
pub use crate::exporter::http::HttpExporterBuilder;
#[cfg(feature = "grpc-tonic")]
Expand Down Expand Up @@ -347,6 +351,10 @@ pub enum Error {
/// The pipeline will need a exporter to complete setup. Throw this error if none is provided.
#[error("no exporter builder is provided, please provide one using with_exporter() method")]
NoExporterBuilder,

/// Unsupported compression algorithm.
#[error("unsupported compression algorithm '{0}'")]
UnsupportedCompressionAlgorithm(String),
}

#[cfg(feature = "grpc-tonic")]
Expand Down
13 changes: 11 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[cfg(feature = "grpc-tonic")]
use {
crate::exporter::tonic::{TonicConfig, TonicExporterBuilder},
crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder},
opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient as TonicLogsServiceClient,
ExportLogsServiceRequest as TonicRequest,
Expand Down Expand Up @@ -60,6 +60,9 @@ use opentelemetry_api::{
};
use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";

impl OtlpPipeline {
/// Create a OTLP logging pipeline.
pub fn logging(self) -> OtlpLogPipeline {
Expand Down Expand Up @@ -235,10 +238,16 @@ impl LogExporter {
tonic_config: TonicConfig,
channel: tonic::transport::Channel,
) -> Result<Self, crate::Error> {
let mut log_exporter = TonicLogsServiceClient::new(channel);
if let Some(compression) =
resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)?
{
log_exporter = log_exporter.send_compressed(compression);
}
Ok(LogExporter::Tonic {
timeout: config.timeout,
metadata: tonic_config.metadata,
log_exporter: TonicLogsServiceClient::new(channel),
log_exporter,
})
}

Expand Down
Loading

0 comments on commit 58ad33a

Please sign in to comment.