Skip to content

Commit

Permalink
added checks for partition EOF handling
Browse files Browse the repository at this point in the history
  • Loading branch information
enzian committed May 3, 2019
1 parent b3e9416 commit 9708c95
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 25 deletions.
7 changes: 1 addition & 6 deletions Microsoft.Extensions.Hosting.Kafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Extensions.Generic.Kafka.Ho
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Extensions.Generic.Kafka.Hosting.CustomSerialization", "samples\Extensions.Generic.Kafka.Hosting.CustomSerialization\Extensions.Generic.Kafka.Hosting.CustomSerialization.csproj", "{1EABE3AA-F1C7-4A12-AF9C-4BCFE9D842B9}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{9FC09F01-22F2-4DE1-998C-C8384C17239E}"
ProjectSection(SolutionItems) = preProject
version.props = version.props
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling", "samples\Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling\Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling.csproj", "{F1933DFA-5CBB-4FC9-9E6E-E7E0B7CB9348}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling", "samples\Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling\Extensions.Generic.Kafka.Hosting.LowLevelKafkaMessageHandling.csproj", "{F1933DFA-5CBB-4FC9-9E6E-E7E0B7CB9348}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
logging:
driver: "none"

broker:
image: confluentinc/cp-enterprise-kafka:5.0.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
expose:
- "9092"
- "29092"
ports:
- "9092:9092"
- "29092:29092"
logging:
driver: "none"

environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
Expand Down Expand Up @@ -184,5 +192,11 @@ services:
# KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
# KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

# subject:
# image: core_alpine_kafka_test:latest
# depends_on:
# - zookeeper
# - broker

volumes:
mi2: {}
5 changes: 5 additions & 0 deletions samples/Extensions.Generic.Kafka.Hosting.Sample/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM mcr.microsoft.com/dotnet/core/runtime:2.2-alpine
RUN apk update && \
apk add librdkafka librdkafka-dev
COPY bin/release/netcoreapp2.1/publish /app
ENTRYPOINT dotnet /app/Extensions.Generic.Kafka.Hosting.Sample.dll
2 changes: 1 addition & 1 deletion samples/Extensions.Generic.Kafka.Hosting.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static async Task Main(string[] args)
})
.UseKafka(config => // Equivalent to .UseKafka<string, byte[]>()
{
config.BootstrapServers = new[] { "kafka:9092" };
config.BootstrapServers = new[] { "broker:9092" };
})
.ConfigureServices(container =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;

namespace Microsoft.Extensions.Hosting.Kafka
{
Expand Down
43 changes: 26 additions & 17 deletions src/Microsoft.Extensions.Hosting.Kafka/KafkaListenerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,36 @@ await Task.Run(async () => {
var msg = consumer.Consume(stoppingToken);
if (msg != null)
{
logger.LogDebug($"Received message from topic '{msg.Topic}:{msg.Partition}' with offset: '{msg.Offset}[{msg.TopicPartitionOffset}]'");

using (var scope = serviceProvider.CreateScope())
if(msg.Message != null)
{
var handler = scope.ServiceProvider.GetService<IKafkaMessageHandler<TKey, TValue>>();
if (handler == null)
{
logger.LogError("Failed to resolve message handler. Did you add it to your DI setup.");
continue;
}
try
{
// Invoke the handler
await handler.Handle(msg);
}
catch (Exception e)
logger.LogDebug($"Received message from topic '{msg.Topic}:{msg.Partition}' with offset: '{msg.Offset}[{msg.TopicPartitionOffset}]'");

using (var scope = serviceProvider.CreateScope())
{
logger.LogError(e, "Message handler failed", e);
continue;
var handler = scope.ServiceProvider.GetService<IKafkaMessageHandler<TKey, TValue>>();
if (handler == null)
{
logger.LogError("Failed to resolve message handler. Did you add it to your DI setup?");
continue;
}
try
{
// Invoke the handler
await handler.Handle(msg);
}
catch (Exception e)
{
logger.LogError(e, "Message handler failed", e);
continue;
}
}
}

// If needed report the end of the partition
if (msg.IsPartitionEOF)
{
logger.LogInformation($"End of topic {msg.Topic} partition {msg.Partition} reached at offset {msg.Offset}");
}
}
else
{
Expand Down

0 comments on commit 9708c95

Please sign in to comment.