Skip to content

Commit

Permalink
[kafka] do not create spans for PartitionEOF events (#3445)
Browse files Browse the repository at this point in the history
  • Loading branch information
lachmatt authored Jun 5, 2024
1 parent f9ae4c9 commit bd71fbe
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h
- Do not use message creation context as a parent for consumer spans for `Confluent.Kafka`
client instrumentation. See the [issue](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/issues/3434)
for details.
- Do not create consumer spans related to `PartitionEOF` events
for `Confluent.Kafka` client instrumentation.

#### Dependency updates

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ internal interface IConsumeResult
public Offset Offset { get; set; }

public Partition Partition { get; set; }

// ReSharper disable once InconsistentNaming
public bool IsPartitionEOF { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
consumeResult = response == null ? null : response.DuckAs<IConsumeResult>();
}

if (consumeResult is not null)
if (consumeResult is not null && !consumeResult.IsPartitionEOF)
{
var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!);
if (exception is not null)
Expand Down
26 changes: 24 additions & 2 deletions test/IntegrationTests/KafkaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,40 @@ public void SubmitsTraces(string packageVersion)
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt.");

collector.ExpectCollected(collection => ValidatePropagation(collection, topicName));

EnableBytecodeInstrumentation();

RunTestApplication(new TestSettings
{
PackageVersion = packageVersion,
Arguments = topicName
Arguments = $"--topic-name {topicName}"
});

collector.AssertExpectations();
}

[Theory]
[Trait("Category", "EndToEnd")]
[Trait("Containers", "Linux")]
[MemberData(nameof(LibraryVersion.GetPlatformVersions), nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))]
// ReSharper disable once InconsistentNaming
public void NoSpansForPartitionEOF(string packageVersion)
{
var topicName = $"test-topic2-{packageVersion}";

using var collector = new MockSpansCollector(Output);
SetExporter(collector);

EnableBytecodeInstrumentation();

RunTestApplication(new TestSettings
{
PackageVersion = packageVersion,
Arguments = $"--topic-name {topicName} --consume-only"
});

collector.AssertEmpty(TimeSpan.FromSeconds(10));
}

private static string GetConsumerGroupIdAttributeValue(string topicName)
{
return $"test-consumer-group-{topicName}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,47 @@ internal static class Program

public static async Task<int> Main(string[] args)
{
var topicName = args[0];
if (args.Length < 2)
{
throw new ArgumentException("Required parameters not provided.");
}

var topicName = args[1];

if (args.Length == 3 && args[2] == "--consume-only")
{
return await ConsumeOnly(topicName);
}

if (args.Length == 2)
{
return await ProduceAndConsume(topicName);
}

throw new ArgumentException("Invalid parameters.");
}

private static async Task<int> ConsumeOnly(string topicName)
{
await CreateTopic(BootstrapServers, topicName);

using var consumer = BuildConsumer(topicName, BootstrapServers);
consumer.Subscribe(topicName);

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
return 0;
}

Console.WriteLine("Unexpected Consume result.");
return 1;
}

private static async Task<int> ProduceAndConsume(string topicName)
{
using var waitEvent = new ManualResetEventSlim();

using var producer = BuildProducer(BootstrapServers);
Expand Down Expand Up @@ -142,6 +181,7 @@ private static IConsumer<string, string> BuildConsumer(string topicName, string
// https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
AutoOffsetReset = AutoOffsetReset.Earliest,
CancellationDelayMaxMs = 5000,
EnablePartitionEof = true,
EnableAutoCommit = true
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"profiles": {
"instrumented": {
"commandName": "Project",
"commandLineArgs": "test-topic",
"commandLineArgs": "--topic-name test-topic",
"environmentVariables": {
"COR_ENABLE_PROFILING": "1",
"COR_PROFILER": "{918728DD-259F-4A6A-AC2B-B85E1B658318}",
Expand Down

0 comments on commit bd71fbe

Please sign in to comment.