Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move RuntimeChannel type arg T to batch_message_channel and associated types #1314

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-contrib/src/trace/exporter/jaeger_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opentelemetry::trace::{SpanId, TraceError};
use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
runtime::RuntimeChannel,
trace::{BatchMessage, Tracer, TracerProvider},
trace::{Tracer, TracerProvider},
};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use std::collections::HashMap;
Expand Down Expand Up @@ -213,7 +213,7 @@ fn opentelemetry_value_to_json(value: &opentelemetry::Value) -> (&str, serde_jso
///
/// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel
#[async_trait]
pub trait JaegerJsonRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
pub trait JaegerJsonRuntime: RuntimeChannel + std::fmt::Debug {
/// Create a new directory if the given path does not exist yet
async fn create_dir(&self, path: &Path) -> ExportResult;
/// Write the provided content to a new file at the given path
Expand Down
7 changes: 2 additions & 5 deletions opentelemetry-datadog/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
resource::{ResourceDetector, SdkProvidedResourceDetector},
runtime::RuntimeChannel,
trace::{BatchMessage, Config, Tracer, TracerProvider},
trace::{Config, Tracer, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions as semcov;
Expand Down Expand Up @@ -300,10 +300,7 @@ impl DatadogPipelineBuilder {

/// Install the Datadog trace exporter pipeline using a batch span processor with the specified
/// runtime.
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
mut self,
runtime: R,
) -> Result<Tracer, TraceError> {
pub fn install_batch<R: RuntimeChannel>(mut self, runtime: R) -> Result<Tracer, TraceError> {
let (config, service_name) = self.build_config_and_service_name();
let exporter = self.build_exporter_with_service_name(service_name)?;
let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime);
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/src/exporter/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
))]
use crate::exporter::addrs_and_family;
use async_trait::async_trait;
use opentelemetry_sdk::{runtime::RuntimeChannel, trace::BatchMessage};
use opentelemetry_sdk::runtime::RuntimeChannel;
use std::net::ToSocketAddrs;

/// Jaeger Trace Runtime is an extension to [`RuntimeChannel`].
///
/// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel
#[async_trait]
pub trait JaegerTraceRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
pub trait JaegerTraceRuntime: RuntimeChannel + std::fmt::Debug {
/// A communication socket between Jaeger client and agent.
type Socket: std::fmt::Debug + Send + Sync;

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opentelemetry::{
global,
logs::{LogError, LoggerProvider},
};
use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel};
use opentelemetry_sdk::{self, export::logs::LogData, runtime::RuntimeChannel};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";
Expand Down Expand Up @@ -166,7 +166,7 @@ impl OtlpLogPipeline<LogExporterBuilder> {
/// Returns a [`Logger`] with the name `opentelemetry-otlp` and the current crate version.
///
/// [`Logger`]: opentelemetry_sdk::logs::Logger
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
pub fn install_batch<R: RuntimeChannel>(
self,
runtime: R,
) -> Result<opentelemetry_sdk::logs::Logger, LogError> {
Expand Down Expand Up @@ -198,7 +198,7 @@ fn build_simple_with_exporter(
logger
}

fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
fn build_batch_with_exporter<R: RuntimeChannel>(
exporter: LogExporter,
log_config: Option<opentelemetry_sdk::logs::Config>,
runtime: R,
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use opentelemetry::{
use opentelemetry_sdk::{
self as sdk,
export::trace::{ExportResult, SpanData},
trace::BatchMessage,
};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use sdk::runtime::RuntimeChannel;
Expand Down Expand Up @@ -122,7 +121,7 @@ impl OtlpTracePipeline<SpanExporterBuilder> {
/// `install_batch` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
pub fn install_batch<R: RuntimeChannel>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down Expand Up @@ -154,7 +153,7 @@ fn build_simple_with_exporter(
tracer
}

fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
fn build_batch_with_exporter<R: RuntimeChannel>(
exporter: SpanExporter,
trace_config: Option<sdk::trace::Config>,
runtime: R,
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@

`should_sample` changes `attributes` from `OrderMap<Key, Value>` to
`Vec<KeyValue>`.
- **Breaking** Move type argument from `RuntimeChannel<T>` to associated types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314)

### Removed

- Remove context from Metric force_flush [#1245](https://github.com/open-telemetry/opentelemetry-rust/pull/1245)
- Remove `logs::BatchMessage` and `trace::BatchMessage` types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314)

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{BatchLogProcessor, BatchMessage, Config, LogProcessor, SimpleLogProcessor};
use super::{BatchLogProcessor, Config, LogProcessor, SimpleLogProcessor};
use crate::{
export::logs::{LogData, LogExporter},
runtime::RuntimeChannel,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Builder {
}

/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel<BatchMessage>>(
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ impl LogProcessor for SimpleLogProcessor {

/// A [`LogProcessor`] that asynchronously buffers log records and reports
/// them at a preconfigured interval.
pub struct BatchLogProcessor<R: RuntimeChannel<BatchMessage>> {
message_sender: R::Sender,
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
}

impl<R: RuntimeChannel<BatchMessage>> Debug for BatchLogProcessor<R> {
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchLogProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}

impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, data: LogData) {
let result = self.message_sender.try_send(BatchMessage::ExportLog(data));

Expand Down Expand Up @@ -158,7 +158,7 @@ impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
}
}

impl<R: RuntimeChannel<BatchMessage>> BatchLogProcessor<R> {
impl<R: RuntimeChannel> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
Expand Down Expand Up @@ -262,7 +262,7 @@ async fn export_with_timeout<R, E>(
batch: Vec<LogData>,
) -> ExportResult
where
R: RuntimeChannel<BatchMessage>,
R: RuntimeChannel,
E: LogExporter + ?Sized,
{
if batch.is_empty() {
Expand Down Expand Up @@ -323,7 +323,7 @@ pub struct BatchLogProcessorBuilder<E, R> {
impl<E, R> BatchLogProcessorBuilder<E, R>
where
E: LogExporter + 'static,
R: RuntimeChannel<BatchMessage>,
R: RuntimeChannel,
{
/// Set max queue size for batches
pub fn with_max_queue_size(self, size: usize) -> Self {
Expand Down Expand Up @@ -372,7 +372,7 @@ where
/// Messages sent between application thread and batch log processor's work thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BatchMessage {
enum BatchMessage {
/// Export logs, usually called when the log is emitted.
ExportLog(LogData),
/// Flush the current buffer to the backend, it can be triggered by
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ mod log_processor;
pub use config::{config, Config};
pub use log_emitter::{Builder, Logger, LoggerProvider};
pub use log_processor::{
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, BatchMessage, LogProcessor,
SimpleLogProcessor,
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor,
};
52 changes: 32 additions & 20 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,22 @@ impl Runtime for AsyncStd {
}
}

/// `MessageRuntime` is an extension to [`Runtime`]. Currently, it provides a
/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
/// channel that is used by the [log] and [span] batch processors.
///
/// [log]: crate::logs::BatchLogProcessor
/// [span]: crate::trace::BatchSpanProcessor
pub trait RuntimeChannel<T: Debug + Send>: Runtime {
pub trait RuntimeChannel: Runtime {
/// A future stream to receive batch messages from channels.
type Receiver: Stream<Item = T> + Send;
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
/// A batch messages sender that can be sent across threads safely.
type Sender: TrySend<Message = T> + Debug;
type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;

/// Return the sender and receiver used to send batch messages.
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>);
}

/// Error returned by a [`TrySend`] implementation.
Expand Down Expand Up @@ -187,11 +190,14 @@ impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {

#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl<T: Debug + Send> RuntimeChannel<T> for Tokio {
type Receiver = tokio_stream::wrappers::ReceiverStream<T>;
type Sender = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for Tokio {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
Expand All @@ -202,11 +208,14 @@ impl<T: Debug + Send> RuntimeChannel<T> for Tokio {

#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl<T: Debug + Send> RuntimeChannel<T> for TokioCurrentThread {
type Receiver = tokio_stream::wrappers::ReceiverStream<T>;
type Sender = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for TokioCurrentThread {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
Expand All @@ -229,11 +238,14 @@ impl<T: Send> TrySend for async_std::channel::Sender<T> {

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl<T: Debug + Send> RuntimeChannel<T> for AsyncStd {
type Receiver = async_std::channel::Receiver<T>;
type Sender = async_std::channel::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for AsyncStd {
type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
type Sender<T: Debug + Send> = async_std::channel::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
async_std::channel::bounded(capacity)
}
}
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ pub use sampler::{Sampler, ShouldSample};
pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor,
SpanProcessor,
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
};
pub use tracer::Tracer;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
use crate::runtime::RuntimeChannel;
use crate::trace::{BatchMessage, BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl Builder {
}

/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel<BatchMessage>>(
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
Expand Down
8 changes: 2 additions & 6 deletions opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::export::trace::{ExportResult, SpanExporter};
use crate::runtime;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime::RuntimeChannel;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::trace::BatchMessage;
use futures_util::future::BoxFuture;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::global::*;
Expand Down Expand Up @@ -42,7 +40,7 @@ impl SpanCountExporter {
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_batch_tracer_provider<R: RuntimeChannel<BatchMessage>>(
fn build_batch_tracer_provider<R: RuntimeChannel>(
exporter: SpanCountExporter,
runtime: R,
) -> crate::trace::TracerProvider {
Expand All @@ -61,9 +59,7 @@ fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::Tr
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
async fn test_set_provider_in_tokio<R: RuntimeChannel<BatchMessage>>(
runtime: R,
) -> Arc<AtomicUsize> {
async fn test_set_provider_in_tokio<R: RuntimeChannel>(runtime: R) -> Arc<AtomicUsize> {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let _ = set_tracer_provider(build_batch_tracer_provider(exporter, runtime));
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Sampler {
where
C: HttpClient + 'static,
Sampler: ShouldSample,
R: crate::runtime::RuntimeChannel<crate::trace::BatchMessage>,
R: crate::runtime::RuntimeChannel,
Svc: Into<String>,
{
JaegerRemoteSamplerBuilder::new(runtime, http_client, default_sampler, service_name)
Expand Down
Loading