Skip to content

Latest commit

 

History

History
93 lines (69 loc) · 3.28 KB

File metadata and controls

93 lines (69 loc) · 3.28 KB

Microsoft.Extensions.Hosting.Kafka

This is an extension library that enables building Kafka consumer applications based on the generic GenericHost which enables the seamless integration with many other hosting extension like logging, dependency injection, etc.

This extension uses The canonical c# driver implementation from conffluent inc.

Usage

This is a minimal sample which enables consuming messages from one or more topics. It assumes the default case of the key to be serialized as string and the value to be byte[]:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .UseConsoleLifetime()
        .UseKafka() // Equivilant to .UseKafka<string, byte[]>(), includes registration of key and value serializers
        .ConfigureServices(container =>
        {
            // The message that matches the 
            container.Add(new ServiceDescriptor(typeof(IMessageHandler<string, byte[]>), typeof(JobMessageHandler), ServiceLifetime.Singleton));

            container.Configure<KafkaListenerSettings>(config =>
            {

                config.BootstrapServers = new[] { "kafka:9092" };
                config.Topics = new[] { "topic1" };
                config.ConsumerGroup = "group1";
            });
        })
        .Build();

    await host.RunAsync();
}

Should it be necessary to consume messages with different key/value types these can be provided as follows:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .UseKafka<DateTimeOffset, string>() // Equivalent to .UseKafka<string, byte[]>()
        .ConfigureServices(container =>
        {
            // The message that matches the 
            container.Add(new ServiceDescriptor(typeof(IMessageHandler<string, byte[]>), typeof(JobMessageHandler), ServiceLifetime.Singleton));

            // Add the necessary serializers into DI!
            container.Add(new ServiceDescriptor(typeof(IDeserializer<string>), new StringDeserializer(Encoding.UTF8)));
            container.Add(new ServiceDescriptor(typeof(IDeserializer<DateTimeOffset>), typeof(DatetimeDeserializer), ServiceLifetime.Singleton));

No matter if .UseKafka() or .UseKafka<TKey, TValue>() is used, the Kafka consumer requires a handler to be registered the implements IMessageHandler<TKey, TValue>:

class JobMessageHandler : IMessageHandler<string, byte[]>
{
    readonly ILogger logger;

    public JobMessageHandler(ILogger<JobMessageHandler> logger)
    {
        this.logger = logger;
    }

    public async Task Handle(string key, byte[] value)
    {
        logger.LogInformation("Received message from Kafka");
    }
}

Development

Development Kafka Instance

Start a standalone installation of Kafka:

docker run -d -e ADVERTISED_HOSTNAME=$(hostname -f) -p 9092:9092 -p 2181:2181 jrottenberg/kafka-standalone

Exec into the container and start a shell:

$ docker exec -it <containerid> bash
/kafka# cd usr/bin
/usr/bin# ./kafka-console-producer --broker-list broker:9092 --topic topic1

[enter multiple mesages]

Run one of the samples and check if they receive messages.