This is an extension library that enables building Kafka consumer applications based on the generic GenericHost
which enables the seamless integration with many other hosting extension like logging, dependency injection, etc.
This extension uses The canonical c# driver implementation from conffluent inc.
This is a minimal sample which enables consuming messages from one or more topics. It assumes the default case of the key
to be serialized as string
and the value to be byte[]
:
public static async Task Main(string[] args)
{
var host = new HostBuilder()
.UseConsoleLifetime()
.UseKafka() // Equivilant to .UseKafka<string, byte[]>(), includes registration of key and value serializers
.ConfigureServices(container =>
{
// The message that matches the
container.Add(new ServiceDescriptor(typeof(IMessageHandler<string, byte[]>), typeof(JobMessageHandler), ServiceLifetime.Singleton));
container.Configure<KafkaListenerSettings>(config =>
{
config.BootstrapServers = new[] { "kafka:9092" };
config.Topics = new[] { "topic1" };
config.ConsumerGroup = "group1";
});
})
.Build();
await host.RunAsync();
}
Should it be necessary to consume messages with different key/value types these can be provided as follows:
public static async Task Main(string[] args)
{
var host = new HostBuilder()
.UseKafka<DateTimeOffset, string>() // Equivalent to .UseKafka<string, byte[]>()
.ConfigureServices(container =>
{
// The message that matches the
container.Add(new ServiceDescriptor(typeof(IMessageHandler<string, byte[]>), typeof(JobMessageHandler), ServiceLifetime.Singleton));
// Add the necessary serializers into DI!
container.Add(new ServiceDescriptor(typeof(IDeserializer<string>), new StringDeserializer(Encoding.UTF8)));
container.Add(new ServiceDescriptor(typeof(IDeserializer<DateTimeOffset>), typeof(DatetimeDeserializer), ServiceLifetime.Singleton));
No matter if .UseKafka()
or .UseKafka<TKey, TValue>()
is used, the Kafka consumer requires a handler to be registered the implements IMessageHandler<TKey, TValue>
:
class JobMessageHandler : IMessageHandler<string, byte[]>
{
readonly ILogger logger;
public JobMessageHandler(ILogger<JobMessageHandler> logger)
{
this.logger = logger;
}
public async Task Handle(string key, byte[] value)
{
logger.LogInformation("Received message from Kafka");
}
}
Start a standalone installation of Kafka:
docker run -d -e ADVERTISED_HOSTNAME=$(hostname -f) -p 9092:9092 -p 2181:2181 jrottenberg/kafka-standalone
Exec into the container and start a shell:
$ docker exec -it <containerid> bash
/kafka# cd usr/bin
/usr/bin# ./kafka-console-producer --broker-list broker:9092 --topic topic1
[enter multiple mesages]
Run one of the samples and check if they receive messages.