diff --git a/.gitignore b/.gitignore index dfcfd56..6f262e3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ ## ## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore +.vscode/ + # User-specific files *.rsuser *.suo diff --git a/src/KafkaConsumer.Tests/DataGenerator.cs b/src/KafkaConsumer.Tests/DataGenerator.cs new file mode 100644 index 0000000..6b83d5c --- /dev/null +++ b/src/KafkaConsumer.Tests/DataGenerator.cs @@ -0,0 +1,52 @@ +using System.Collections.Generic; +using System.Linq; +using Confluent.Kafka; + +namespace KafkaConsumer.Tests +{ + public static class DataGenerator + { + public static IEnumerable GenerateTopicPartitions(int count) => + Enumerable + .Range(0, count) + .Select(i => new TopicPartition("test-topic", i)); + + public static TopicPartition TopicPartition => + new TopicPartition("test-topic", 1); + + public static TopicPartitionOffset TopicPartitionOffset => + new TopicPartitionOffset(TopicPartition, 1); + + public static MessageHandler.Message GenerateMessage( + IConsumer consumer) => + new MessageHandler.Message(consumer, ConsumeResult); + + public static IEnumerable> GenerateMessages( + IConsumer consumer, + int count) => + GenerateConsumeResults(count) + .Select(cr => new MessageHandler.Message(consumer, cr)); + + public static ConsumeResult ConsumeResult => + new ConsumeResult + { + Message = new Message + { + Key = "key", + Value = "value" + }, + TopicPartitionOffset = TopicPartitionOffset + }; + + public static IEnumerable> GenerateConsumeResults(int count) => + Enumerable.Range(0, count).Select(i => new ConsumeResult + { + Message = new Message + { + Key = $"key_{i}", + Value = $"value_{i}" + }, + TopicPartitionOffset = new TopicPartitionOffset("test-topic", 0, i) + }); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs new file mode 100644 index 0000000..07527b3 --- /dev/null +++ b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs @@ -0,0 +1,20 @@ +using FluentAssertions; +using Moq; + +namespace KafkaConsumer.Tests.Extensions +{ + public static class ObjectExtensions + { + public static bool IsEquivalentTo(this T actual, T expected) + { + actual.Should().BeEquivalentTo(expected); + + return true; + } + + public static T IsActual(this T expected) + { + return It.Is(actual => actual.IsEquivalentTo(expected)); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj new file mode 100644 index 0000000..b8f5f80 --- /dev/null +++ b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp3.1 + + false + + + + + + + + + + + + + + + + diff --git a/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs new file mode 100644 index 0000000..38e9913 --- /dev/null +++ b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs @@ -0,0 +1,54 @@ +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; +using KafkaConsumer.MessageHandler; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests +{ + public class MessageHandlerVerifier + { + private readonly List> _messages; + + private int _index; + + public MessageHandlerVerifier(List> messages) + { + _messages = messages; + } + + public static void Verify( + Mock> messageHandler, + List> messages) + { + if(!messages.Any()) + { + messageHandler.Verify( + mh => mh.HandleAsync(It.IsAny>()), + Times.Never()); + + return; + } + + var collectionVerifier = new MessageHandlerVerifier(messages); + + messageHandler.Verify(mh => mh.HandleAsync( + It.Is>(msg => collectionVerifier.Validate(msg)))); + + messageHandler.VerifyNoOtherCalls(); + + Assert.Equal(messages.Count, collectionVerifier._index); + } + + + private bool Validate(Message message) + { + _messages[_index].Should().BeEquivalentTo(message); + + _index++; + + return true; + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs new file mode 100644 index 0000000..0a65e5b --- /dev/null +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs @@ -0,0 +1,158 @@ +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Processor; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using System; +using Xunit; + +namespace KafkaConsumer.Tests.Processor +{ + public class KafkaProcessorBuilderShould + { + private readonly Mock> _topicPartitionQueueSelector; + private readonly Mock> _messageHandler; + private readonly KafkaProcessorBuilder _kafkaProcessorBuilder; + private readonly Mock> _keyDeserializer; + private readonly Mock> _valueDeserializer; + + public KafkaProcessorBuilderShould() + { + _keyDeserializer = new Mock>(); + _valueDeserializer = new Mock>(); + _messageHandler = new Mock>(); + _topicPartitionQueueSelector = new Mock>(); + + _kafkaProcessorBuilder = new KafkaProcessorBuilder(_topicPartitionQueueSelector.Object); + } + + [Fact] + public void ThrowIfConsumerConfigNotSet() + { + // arrange + var topic = "topic"; + + _kafkaProcessorBuilder + .FromTopic(topic) + .WithHandlerFactory(_ => _messageHandler.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'consumerConfig' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfTopicNotSet() + { + // arrange + var consumerConfig = new ConsumerConfig(); + + _kafkaProcessorBuilder + .WithConfig(consumerConfig) + .WithHandlerFactory(_ => _messageHandler.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'topic' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfHandlerFactoryNotSet() + { + // arrange + var consumerConfig = new ConsumerConfig(); + var topic = "topic"; + + _kafkaProcessorBuilder + .WithConfig(consumerConfig) + .FromTopic(topic); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'handlerFactory' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfConsumerConfigAlreadySet() + { + // arrange + var consumerConfig = new ConsumerConfig(); + + _kafkaProcessorBuilder.WithConfig(consumerConfig); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithConfig(consumerConfig)); + + // assert + Assert.Equal("'consumerConfig' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfTopicAlreadySet() + { + // arrange + var topic = "topic"; + + _kafkaProcessorBuilder.FromTopic(topic); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.FromTopic(topic)); + + // assert + Assert.Equal("'topic' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfHandlerFactoryAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object)); + + // assert + Assert.Equal("'handlerFactory' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfKeyDeserializerAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object)); + + // assert + Assert.Equal("'keyDeserializer' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfValueDeserializerAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object)); + + // assert + Assert.Equal("'valueDeserializer' was already set!", exception.Message); + } + } +} diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs new file mode 100644 index 0000000..657ed9a --- /dev/null +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -0,0 +1,135 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.Processor; +using KafkaConsumer.Processor.Config; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.Processor +{ + public class KafkaProcessorShould + { + private readonly Mock> _consumer; + private readonly Mock> _topicPartitionQueue; + private readonly Mock> _topicPartitionQueueSelector; + private readonly KafkaProcessor _kafkaProcessor; + private readonly ProcessorConfig _config; + + public KafkaProcessorShould() + { + _topicPartitionQueue = new Mock>(); + _consumer = new Mock>(); + _topicPartitionQueueSelector = new Mock>(); + _config = new ProcessorConfig + { + Topic = "test-topic" + }; + + _kafkaProcessor = new KafkaProcessor( + _consumer.Object, + _topicPartitionQueueSelector.Object, + Options.Create(_config)); + } + + [Fact] + public async Task EnqueueConsumedResultToTopicPartitionQueue() + { + // arrange + var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsActual())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _topicPartitionQueue.Verify( + tpq => tpq.EnqueueAsync(message.IsActual()), + Times.Once()); + } + + [Fact] + public async Task CloseConsumerAfterStoppingToProcessMessages() + { + // arrange + var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsActual())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _consumer.Verify( + c => c.Close(), + Times.Once()); + } + + [Fact] + public async Task CloseConsumerIfExceptionWasThrown() + { + // arrange + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Throws(); + + // act + await Assert.ThrowsAsync( + () => _kafkaProcessor.ProcessMessagesAsync(CancellationToken.None)); + + // assert + _consumer.Verify( + c => c.Close(), + Times.Once()); + } + + [Fact] + public async Task SubscribeToTopic() + { + // arrange + var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsActual())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _consumer.Verify( + c => c.Subscribe(_config.Topic), + Times.Once()); + } + } +} diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs new file mode 100644 index 0000000..8f97f5a --- /dev/null +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs @@ -0,0 +1,139 @@ +using KafkaConsumer.Exceptions; +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.TopicPartitionQueue +{ + public class TopicPartitionQueueSelectorShould + { + private readonly Mock> _topicPartitionQueueFactory; + private readonly Mock> _topicPartitionQueue; + private readonly Mock> _messageHandler; + private readonly TopicPartitionQueueSelector _topicPartitionQueueSelector; + + public TopicPartitionQueueSelectorShould() + { + _messageHandler = new Mock>(); + _topicPartitionQueue = new Mock>(); + _topicPartitionQueueFactory = new Mock>(); + + _topicPartitionQueueSelector = new TopicPartitionQueueSelector( + _topicPartitionQueueFactory.Object, + 1000); + } + + [Fact] + public void SelectTopicPartitionQueue() + { + // arrange + var tp = DataGenerator.TopicPartition; + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); + + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + + // act + var topicParitionQueue = _topicPartitionQueueSelector.Select(tp); + + // assert + topicParitionQueue.IsEquivalentTo(_topicPartitionQueue.Object); + } + + [Fact] + public void AddQueues() + { + // arrange + var tps = DataGenerator.GenerateTopicPartitions(10); + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); + + // act + foreach (var tp in tps) + { + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + } + + // assert + foreach (var tp in tps) + { + var tpq = _topicPartitionQueueSelector.Select(tp); + + tpq.IsEquivalentTo(_topicPartitionQueue.Object); + } + } + + [Fact] + public void RemoveQueues() + { + // arrange + var tps = DataGenerator.GenerateTopicPartitions(10); + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); + + foreach (var tp in tps) + { + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + } + + // act + _topicPartitionQueueSelector.Remove(tps); + + // assert + foreach (var tp in tps) + { + Assert.Throws( + () => _topicPartitionQueueSelector.Select(tp)); + } + } + + [Fact] + public void AbortQueuesOnRemove() + { + // arrange + var tp = DataGenerator.TopicPartition; + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); + + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + + // act + _topicPartitionQueueSelector.Remove(new []{ tp }); + + // assert + _topicPartitionQueue.Verify(tpq => tpq.AbortAsync(), Times.Once()); + } + + [Fact] + public void ThrowIfTopicPartitionNotFound() + { + // arrange + var tp = DataGenerator.TopicPartition; + + // act + var exception = Assert.Throws( + () => _topicPartitionQueueSelector.Select(tp)); + + // assert + exception.TopicPartition.IsEquivalentTo(tp); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs new file mode 100644 index 0000000..a3bf724 --- /dev/null +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs @@ -0,0 +1,104 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.TopicPartitionQueue +{ + public class TopicPartitionQueueShould + { + private readonly Mock> _messageHandler; + private readonly TopicPartitionQueue _topicPartitionQueue; + private readonly Mock> _consumer; + + public TopicPartitionQueueShould() + { + _consumer = new Mock>(); + _messageHandler = new Mock>(); + + _topicPartitionQueue = new TopicPartitionQueue(_messageHandler.Object, 1000); + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(500)] + [InlineData(1001)] + public async Task EnqueueAndInvokeMessageHandler(int count) + { + // arrange + var messages = DataGenerator.GenerateMessages(_consumer.Object, count).ToList(); + + _messageHandler + .Setup(m => m.HandleAsync(It.IsAny>())) + .Returns(Task.Delay(10)); + + // act + foreach (var message in messages) + { + var isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); + + Assert.True(isEnqueued); + } + + await _topicPartitionQueue.CompleteAsync(); + + // assert + MessageHandlerVerifier.Verify(_messageHandler, messages); + } + + [Fact] + public async Task PropagateErrorsFromMessageHandlerOnComplete() + { + // arrange + var message = DataGenerator.GenerateMessage(_consumer.Object); + + _messageHandler + .Setup(m => m.HandleAsync(message.IsActual())) + .Throws(); + + // act + await EnqueueWhileSuccessful(message); + + // assert + await Assert.ThrowsAsync( + () => _topicPartitionQueue.CompleteAsync()); + + _messageHandler.Verify( + m => m.HandleAsync(message.IsActual()), + Times.Once()); + } + + [Fact] + public async Task PropagateErrorsFromMessageHandlerOnAbort() + { + // arrange + var message = DataGenerator.GenerateMessage(_consumer.Object); + + _messageHandler + .Setup(m => m.HandleAsync(message.IsActual())) + .Throws(); + + // act + await EnqueueWhileSuccessful(message); + + // assert + await Assert.ThrowsAsync( + () => _topicPartitionQueue.AbortAsync()); + + _messageHandler.Verify( + m => m.HandleAsync(message.IsActual()), + Times.Once()); + } + + private async Task EnqueueWhileSuccessful(MessageHandler.Message message) + { + while (await _topicPartitionQueue.TryEnqueueAsync(message)) { } + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.sln b/src/KafkaConsumer.sln new file mode 100644 index 0000000..7dd618e --- /dev/null +++ b/src/KafkaConsumer.sln @@ -0,0 +1,48 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumer", "KafkaConsumer\KafkaConsumer.csproj", "{D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumer.Tests", "KafkaConsumer.Tests\KafkaConsumer.Tests.csproj", "{72258648-1CD4-4708-9A4A-BBB73FF49AB9}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x64.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x64.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x86.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x86.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|Any CPU.Build.0 = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x64.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x64.Build.0 = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x64.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x64.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x86.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x86.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|Any CPU.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x64.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x64.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x86.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs b/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs new file mode 100644 index 0000000..c96360b --- /dev/null +++ b/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs @@ -0,0 +1,31 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.Exceptions +{ + public class TopicPartitionQueueException : System.Exception + { + public TopicPartition TopicPartition { get; set; } + + public TopicPartitionQueueException( + TopicPartition topicPartition) + { + TopicPartition = topicPartition; + } + + public TopicPartitionQueueException( + TopicPartition topicPartition, + string message) + : base(message) + { + TopicPartition = topicPartition; + } + public TopicPartitionQueueException( + TopicPartition topicPartition, + string message, + System.Exception inner) + : base(message, inner) + { + TopicPartition = topicPartition; + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Extensions/DataflowExtensions.cs b/src/KafkaConsumer/Extensions/DataflowExtensions.cs new file mode 100644 index 0000000..b814730 --- /dev/null +++ b/src/KafkaConsumer/Extensions/DataflowExtensions.cs @@ -0,0 +1,18 @@ +using System.Threading.Tasks.Dataflow; + +namespace KafkaConsumer.Extensions +{ + public static class DataflowExtensions + { + public static void PropagateErrorsTo(this IDataflowBlock from, IDataflowBlock to) + { + from.Completion.ContinueWith(task => + { + if (task.IsFaulted) + { + to.Fault(task.Exception.InnerException); + } + }); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj new file mode 100644 index 0000000..3f8142a --- /dev/null +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -0,0 +1,13 @@ + + + + netstandard2.1 + + + + + + + + + diff --git a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs new file mode 100644 index 0000000..c74360a --- /dev/null +++ b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace KafkaConsumer.MessageHandler +{ + public interface IMessageHandler + { + Task HandleAsync(Message message); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/MessageHandler/Message.cs b/src/KafkaConsumer/MessageHandler/Message.cs new file mode 100644 index 0000000..d63d6bb --- /dev/null +++ b/src/KafkaConsumer/MessageHandler/Message.cs @@ -0,0 +1,28 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.MessageHandler +{ + public class Message + { + private readonly IConsumer _consumer; + + public ConsumeResult ConsumeResult { get; set; } + + public Message(IConsumer consumer, ConsumeResult consumeResult) + { + _consumer = consumer; + + ConsumeResult = consumeResult; + } + + public void StoreOffset() + { + _consumer.StoreOffset(ConsumeResult); + } + + public void Commit() + { + _consumer.Commit(ConsumeResult); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs new file mode 100644 index 0000000..22695ab --- /dev/null +++ b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs @@ -0,0 +1,7 @@ +namespace KafkaConsumer.Processor.Config +{ + public class ProcessorConfig + { + public string Topic { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/IKafkaProcessor.cs b/src/KafkaConsumer/Processor/IKafkaProcessor.cs new file mode 100644 index 0000000..cb0e4ac --- /dev/null +++ b/src/KafkaConsumer/Processor/IKafkaProcessor.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaConsumer.Processor +{ + public interface IKafkaProcessor : IDisposable + { + Task ProcessMessagesAsync(CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs new file mode 100644 index 0000000..66050f4 --- /dev/null +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -0,0 +1,78 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.Processor.Config; +using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; + +namespace KafkaConsumer.Processor +{ + public class KafkaProcessor : IKafkaProcessor + { + private readonly IConsumer _consumer; + private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private readonly ProcessorConfig _config; + + public KafkaProcessor( + IConsumer consumer, + ITopicPartitionQueueSelector topicPartitionQueueSelector, + IOptions config) + { + _consumer = consumer; + _topicPartitionQueueSelector = topicPartitionQueueSelector; + _config = config.Value; + } + + public async Task ProcessMessagesAsync(CancellationToken ct) + { + _consumer.Subscribe(_config.Topic); + + try + { + while (!ct.IsCancellationRequested) + { + var consumeResult = Consume(ct); + + if (consumeResult != null) + { + var queue = _topicPartitionQueueSelector.Select(consumeResult.TopicPartition); + + var message = new MessageHandler.Message(_consumer, consumeResult); + + await queue.EnqueueAsync(message); + } + } + } + catch (OperationCanceledException) { } + finally + { + //TODO: Close() blocks indefinitely if an exception is thrown in PartitionsRevoked/Assigned handlers. + // https://github.com/confluentinc/confluent-kafka-dotnet/issues/1280 + + _consumer.Close(); + } + } + + private ConsumeResult Consume(CancellationToken ct) + { + ConsumeResult cr = null; + + try + { + cr = _consumer.Consume(ct); + } + catch (ConsumeException) + { + //TODO: log and continue - ConsumeExceptions are not fatal + } + + return cr; + } + + public void Dispose() + { + _consumer.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs new file mode 100644 index 0000000..3ef30de --- /dev/null +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -0,0 +1,184 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Processor.Config; +using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; + +namespace KafkaConsumer.Processor +{ + public class KafkaProcessorBuilder + { + private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; + + private Func> _handlerFactory; + private ConsumerConfig _consumerConfig; + private string _topic; + private IDeserializer _keyDeserializer; + private IDeserializer _valueDeserializer; + + public KafkaProcessorBuilder(ITopicPartitionQueueSelector topicPartitionQueueSelector) + { + _topicPartitionQueueSelector = topicPartitionQueueSelector; + } + + public static KafkaProcessorBuilder CreateDefault() + { + var queueFactory = new TopicPartitionQueueFactory(); + + var topicPartitionQueueSelector = new TopicPartitionQueueSelector( + queueFactory, + 1000); + + return new KafkaProcessorBuilder(topicPartitionQueueSelector); + } + + public KafkaProcessorBuilder WithConfig(ConsumerConfig consumerConfig) + { + if (_consumerConfig != null) + throw new InvalidOperationException("'consumerConfig' was already set!"); + + _consumerConfig = consumerConfig; + + return this; + } + + public KafkaProcessorBuilder FromTopic(string topic) + { + if (_topic != null) + throw new InvalidOperationException("'topic' was already set!"); + + _topic = topic; + + return this; + } + + public KafkaProcessorBuilder WithHandlerFactory( + Func> handlerFactory) + { + if (_handlerFactory != null) + throw new InvalidOperationException("'handlerFactory' was already set!"); + + _handlerFactory = handlerFactory; + + return this; + } + + public KafkaProcessorBuilder WithKeyDeserializer(IDeserializer keyDeserializer) + { + if (_keyDeserializer != null) + throw new InvalidOperationException("'keyDeserializer' was already set!"); + + _keyDeserializer = keyDeserializer; + + return this; + } + + public KafkaProcessorBuilder WithValueDeserializer(IDeserializer valueDeserializer) + { + if (_valueDeserializer != null) + throw new InvalidOperationException("'valueDeserializer' was already set!"); + + _valueDeserializer = valueDeserializer; + + return this; + } + + public IKafkaProcessor Build() + { + CheckIfConfigured(); + + var consumer = BuildConsumer(); + + //TODO: expose WithProcessorConfig() method + var processorConfig = Options.Create(new ProcessorConfig + { + Topic = _topic + }); + + return new KafkaProcessor( + consumer, + _topicPartitionQueueSelector, + processorConfig); + } + + private IConsumer BuildConsumer() + { + var builder = new ConsumerBuilder(_consumerConfig) + .SetPartitionsAssignedHandler(OnPartitionsAssigned) + .SetPartitionsRevokedHandler(OnPartitionsRevoked); + + if (_keyDeserializer != null) + { + builder.SetKeyDeserializer(_keyDeserializer); + } + + if (_valueDeserializer != null) + { + builder.SetValueDeserializer(_valueDeserializer); + } + + return builder.Build(); + } + + private IEnumerable OnPartitionsRevoked( + IConsumer consumer, + List partitions) + { + _topicPartitionQueueSelector.Remove(partitions.Select(p => p.TopicPartition)); + + try + { + consumer.Commit(); + } + catch (KafkaException ex) + when (ex.Error.Code == ErrorCode.Local_NoOffset) + { + // ignore + } + + return partitions; + } + + private IEnumerable OnPartitionsAssigned( + IConsumer consumer, + List partitions) + { + foreach (var partition in partitions) + { + var messageHandler = _handlerFactory.Invoke(partition); + + _topicPartitionQueueSelector.AddQueue(partition, messageHandler); + } + + return partitions.Select(p => new TopicPartitionOffset(p, Offset.Stored)); + } + + private void CheckIfConfigured() + { + CheckIfConsumerConfigSet(); + CheckIfHandlerFactorySet(); + CheckIfTopicSet(); + } + + private void CheckIfConsumerConfigSet() + { + if (_consumerConfig == null) + throw new InvalidOperationException("'consumerConfig' must be set!"); + } + + private void CheckIfHandlerFactorySet() + { + if (_handlerFactory == null) + throw new InvalidOperationException("'handlerFactory' must be set!"); + } + + private void CheckIfTopicSet() + { + if (string.IsNullOrEmpty(_topic)) + throw new InvalidOperationException("'topic' must be set!"); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs new file mode 100644 index 0000000..d435354 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -0,0 +1,13 @@ +using KafkaConsumer.MessageHandler; +using System.Threading.Tasks; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueue + { + Task EnqueueAsync(Message message); + Task TryEnqueueAsync(Message message); + Task CompleteAsync(); + Task AbortAsync(); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs new file mode 100644 index 0000000..b7144b1 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs @@ -0,0 +1,11 @@ +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueueFactory + { + ITopicPartitionQueue Create( + IMessageHandler messageHandler, + int queueCapacity); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs new file mode 100644 index 0000000..98e9aa4 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs @@ -0,0 +1,13 @@ +using System.Collections.Generic; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueueSelector + { + ITopicPartitionQueue Select(TopicPartition topicPartition); + void AddQueue(TopicPartition topicPartition, IMessageHandler messageHandler); + void Remove(IEnumerable topicPartitions); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs new file mode 100644 index 0000000..8a61461 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -0,0 +1,67 @@ +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using KafkaConsumer.Extensions; +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueue : ITopicPartitionQueue + { + private readonly IMessageHandler _messageHandler; + + private readonly BufferBlock> _bufferBlock; + private readonly ActionBlock> _actionBlock; + + public TopicPartitionQueue(IMessageHandler messageHandler, int queueCapacity) + { + _messageHandler = messageHandler; + + _bufferBlock = new BufferBlock>( + new DataflowBlockOptions + { + BoundedCapacity = queueCapacity + }); + + _actionBlock = new ActionBlock>( + _messageHandler.HandleAsync, + new ExecutionDataflowBlockOptions + { + BoundedCapacity = 1 + }); + + _bufferBlock.LinkTo(_actionBlock, new DataflowLinkOptions + { + PropagateCompletion = true + }); + + _actionBlock.PropagateErrorsTo(_bufferBlock); + } + + public async Task CompleteAsync() + { + _bufferBlock.Complete(); + + await _actionBlock.Completion; + } + + public async Task AbortAsync() + { + _actionBlock.Complete(); + + await _actionBlock.Completion; + } + + public async Task TryEnqueueAsync(Message message) => + await _bufferBlock.SendAsync(message); + + public async Task EnqueueAsync(Message message) + { + var isEnqueued = await _bufferBlock.SendAsync(message); + + if (!isEnqueued) + { + await AbortAsync(); + } + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs new file mode 100644 index 0000000..60f1c2d --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs @@ -0,0 +1,14 @@ +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueueFactory : ITopicPartitionQueueFactory + { + public ITopicPartitionQueue Create( + IMessageHandler messageHandler, + int queueCapacity) + { + return new TopicPartitionQueue(messageHandler, queueCapacity); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs new file mode 100644 index 0000000..13b2cf9 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -0,0 +1,50 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.Exceptions; +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueueSelector : ITopicPartitionQueueSelector + { + private readonly Dictionary> _queues; + private readonly ITopicPartitionQueueFactory _topicPartitionQueueFactory; + private readonly int _queueCapacity; + + public TopicPartitionQueueSelector( + ITopicPartitionQueueFactory topicPartitionQueueFactory, + int queueCapacity) + { + _queues = new Dictionary>(); + + _queueCapacity = queueCapacity; + _topicPartitionQueueFactory = topicPartitionQueueFactory; + } + + public ITopicPartitionQueue Select(TopicPartition topicPartition) + { + return _queues.TryGetValue(topicPartition, out var topicPartitionQueue) + ? topicPartitionQueue + : throw new TopicPartitionQueueException( + topicPartition, + $"TopicPartitionQueue not found for {topicPartition}"); + } + + public void AddQueue(TopicPartition topicPartition, IMessageHandler messageHandler) + { + _queues.Add(topicPartition, _topicPartitionQueueFactory.Create(messageHandler, _queueCapacity)); + } + + public void Remove(IEnumerable topicPartitions) + { + Task.WaitAll(_queues.Select(q => q.Value.AbortAsync()).ToArray()); + + foreach (var tp in topicPartitions) + { + _queues.Remove(tp); + } + } + } +} \ No newline at end of file