From bd71fbef5890d25f503d0554df0646ea9cf8e791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20=C5=81ach?= Date: Wed, 5 Jun 2024 18:50:35 +0200 Subject: [PATCH] [kafka] do not create spans for PartitionEOF events (#3445) --- CHANGELOG.md | 2 + .../Kafka/DuckTypes/IConsumeResult.cs | 3 ++ .../ConsumerConsumeSyncIntegration.cs | 2 +- test/IntegrationTests/KafkaTests.cs | 26 +++++++++++- .../TestApplication.Kafka/Program.cs | 42 ++++++++++++++++++- .../Properties/launchSettings.json | 2 +- 6 files changed, 72 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88ccd7a0f3..191d5031e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h - Do not use message creation context as a parent for consumer spans for `Confluent.Kafka` client instrumentation. See the [issue](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/issues/3434) for details. +- Do not create consumer spans related to `PartitionEOF` events + for `Confluent.Kafka` client instrumentation. #### Dependency updates diff --git a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/DuckTypes/IConsumeResult.cs b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/DuckTypes/IConsumeResult.cs index ce6071567f..c090f4697c 100644 --- a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/DuckTypes/IConsumeResult.cs +++ b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/DuckTypes/IConsumeResult.cs @@ -13,4 +13,7 @@ internal interface IConsumeResult public Offset Offset { get; set; } public Partition Partition { get; set; } + + // ReSharper disable once InconsistentNaming + public bool IsPartitionEOF { get; set; } } diff --git a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs index a20b39dace..55c47b2fb7 100644 --- a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs +++ b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs @@ -46,7 +46,7 @@ internal static CallTargetReturn OnMethodEnd(TTar consumeResult = response == null ? null : response.DuckAs(); } - if (consumeResult is not null) + if (consumeResult is not null && !consumeResult.IsPartitionEOF) { var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!); if (exception is not null) diff --git a/test/IntegrationTests/KafkaTests.cs b/test/IntegrationTests/KafkaTests.cs index 1c9be25966..897d53f240 100644 --- a/test/IntegrationTests/KafkaTests.cs +++ b/test/IntegrationTests/KafkaTests.cs @@ -76,18 +76,40 @@ public void SubmitsTraces(string packageVersion) collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt."); collector.ExpectCollected(collection => ValidatePropagation(collection, topicName)); - EnableBytecodeInstrumentation(); RunTestApplication(new TestSettings { PackageVersion = packageVersion, - Arguments = topicName + Arguments = $"--topic-name {topicName}" }); collector.AssertExpectations(); } + [Theory] + [Trait("Category", "EndToEnd")] + [Trait("Containers", "Linux")] + [MemberData(nameof(LibraryVersion.GetPlatformVersions), nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))] + // ReSharper disable once InconsistentNaming + public void NoSpansForPartitionEOF(string packageVersion) + { + var topicName = $"test-topic2-{packageVersion}"; + + using var collector = new MockSpansCollector(Output); + SetExporter(collector); + + EnableBytecodeInstrumentation(); + + RunTestApplication(new TestSettings + { + PackageVersion = packageVersion, + Arguments = $"--topic-name {topicName} --consume-only" + }); + + collector.AssertEmpty(TimeSpan.FromSeconds(10)); + } + private static string GetConsumerGroupIdAttributeValue(string topicName) { return $"test-consumer-group-{topicName}"; diff --git a/test/test-applications/integrations/TestApplication.Kafka/Program.cs b/test/test-applications/integrations/TestApplication.Kafka/Program.cs index 7666a00aae..60ed8422db 100644 --- a/test/test-applications/integrations/TestApplication.Kafka/Program.cs +++ b/test/test-applications/integrations/TestApplication.Kafka/Program.cs @@ -13,8 +13,47 @@ internal static class Program public static async Task Main(string[] args) { - var topicName = args[0]; + if (args.Length < 2) + { + throw new ArgumentException("Required parameters not provided."); + } + + var topicName = args[1]; + + if (args.Length == 3 && args[2] == "--consume-only") + { + return await ConsumeOnly(topicName); + } + + if (args.Length == 2) + { + return await ProduceAndConsume(topicName); + } + + throw new ArgumentException("Invalid parameters."); + } + + private static async Task ConsumeOnly(string topicName) + { + await CreateTopic(BootstrapServers, topicName); + + using var consumer = BuildConsumer(topicName, BootstrapServers); + consumer.Subscribe(topicName); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var consumeResult = consumer.Consume(cts.Token); + if (consumeResult.IsPartitionEOF) + { + return 0; + } + + Console.WriteLine("Unexpected Consume result."); + return 1; + } + + private static async Task ProduceAndConsume(string topicName) + { using var waitEvent = new ManualResetEventSlim(); using var producer = BuildProducer(BootstrapServers); @@ -142,6 +181,7 @@ private static IConsumer BuildConsumer(string topicName, string // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example AutoOffsetReset = AutoOffsetReset.Earliest, CancellationDelayMaxMs = 5000, + EnablePartitionEof = true, EnableAutoCommit = true }; diff --git a/test/test-applications/integrations/TestApplication.Kafka/Properties/launchSettings.json b/test/test-applications/integrations/TestApplication.Kafka/Properties/launchSettings.json index 5545a6d26e..8f8ca0b9f9 100644 --- a/test/test-applications/integrations/TestApplication.Kafka/Properties/launchSettings.json +++ b/test/test-applications/integrations/TestApplication.Kafka/Properties/launchSettings.json @@ -3,7 +3,7 @@ "profiles": { "instrumented": { "commandName": "Project", - "commandLineArgs": "test-topic", + "commandLineArgs": "--topic-name test-topic", "environmentVariables": { "COR_ENABLE_PROFILING": "1", "COR_PROFILER": "{918728DD-259F-4A6A-AC2B-B85E1B658318}",