From e2c8db0834b8409cc881d2cb6f126df2fa11ffb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sun, 6 Sep 2020 18:59:30 +0300 Subject: [PATCH 01/22] Created the solution --- src/KafkaConsumer.sln | 34 ++++++++++++++++++++++++++ src/KafkaConsumer/Class1.cs | 8 ++++++ src/KafkaConsumer/KafkaConsumer.csproj | 7 ++++++ 3 files changed, 49 insertions(+) create mode 100644 src/KafkaConsumer.sln create mode 100644 src/KafkaConsumer/Class1.cs create mode 100644 src/KafkaConsumer/KafkaConsumer.csproj diff --git a/src/KafkaConsumer.sln b/src/KafkaConsumer.sln new file mode 100644 index 0000000..038f49a --- /dev/null +++ b/src/KafkaConsumer.sln @@ -0,0 +1,34 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumer", "KafkaConsumer\KafkaConsumer.csproj", "{D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x64.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x64.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x86.ActiveCfg = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Debug|x86.Build.0 = Debug|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|Any CPU.Build.0 = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x64.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x64.Build.0 = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.ActiveCfg = Release|Any CPU + {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/src/KafkaConsumer/Class1.cs b/src/KafkaConsumer/Class1.cs new file mode 100644 index 0000000..6c3a3af --- /dev/null +++ b/src/KafkaConsumer/Class1.cs @@ -0,0 +1,8 @@ +using System; + +namespace KafkaConsumer +{ + public class Class1 + { + } +} diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + From 592e3bacc9392a0bd94256958efa874106aa101e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sun, 6 Sep 2020 20:22:32 +0300 Subject: [PATCH 02/22] KafkaConsumer backbone --- .gitignore | 2 + .../Extensions/ObjectExtensions.cs | 20 +++++ .../KafkaConsumer.Tests.csproj | 22 ++++++ .../Processor/KafkaProcessorShould.cs | 74 +++++++++++++++++++ src/KafkaConsumer.sln | 14 ++++ src/KafkaConsumer/Class1.cs | 8 -- src/KafkaConsumer/KafkaConsumer.csproj | 4 + .../MessageHandler/IMessageHandler.cs | 10 +++ .../Processor/IKafkaProcessor.cs | 10 +++ src/KafkaConsumer/Processor/KafkaProcessor.cs | 39 ++++++++++ .../ITopicPartitionQueue.cs | 10 +++ .../ITopicPartitionQueueSelector.cs | 9 +++ .../TopicPartitionQueue.cs | 24 ++++++ .../TopicPartitionQueueSelector.cs | 12 +++ 14 files changed, 250 insertions(+), 8 deletions(-) create mode 100644 src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs create mode 100644 src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj create mode 100644 src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs delete mode 100644 src/KafkaConsumer/Class1.cs create mode 100644 src/KafkaConsumer/MessageHandler/IMessageHandler.cs create mode 100644 src/KafkaConsumer/Processor/IKafkaProcessor.cs create mode 100644 src/KafkaConsumer/Processor/KafkaProcessor.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs diff --git a/.gitignore b/.gitignore index dfcfd56..6f262e3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ ## ## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore +.vscode/ + # User-specific files *.rsuser *.suo diff --git a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs new file mode 100644 index 0000000..500b5cd --- /dev/null +++ b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs @@ -0,0 +1,20 @@ +using FluentAssertions; +using Moq; + +namespace KafkaConsumer.Tests.Extensions +{ + public static class ObjectExtensions + { + public static bool IsEquivalentTo(this object obj, object other) + { + obj.Should().BeEquivalentTo(other); + + return true; + } + + public static T IsExpected(this T obj) + { + return It.Is(other => obj.IsEquivalentTo(other)); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj new file mode 100644 index 0000000..b7f73c1 --- /dev/null +++ b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp3.1 + + false + + + + + + + + + + + + + + + + diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs new file mode 100644 index 0000000..7bd5057 --- /dev/null +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -0,0 +1,74 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using FluentAssertions; +using KafkaConsumer.Processor; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.Processor +{ + public class KafkaProcessorShould + { + private readonly Mock> _consumer; + private readonly Mock> _topicPartitionQueue; + private readonly Mock> _topicPartitionQueueSelector; + private readonly KafkaProcessor _kafkaProcessor; + + public KafkaProcessorShould() + { + _topicPartitionQueue = new Mock>(); + _consumer = new Mock>(); + _topicPartitionQueueSelector = new Mock>(); + + _kafkaProcessor = new KafkaProcessor( + _consumer.Object, + _topicPartitionQueueSelector.Object); + } + + [Fact] + public async Task EnqueueConsumedResultToTopicPartitionQueue() + { + // arrange + var cr = ConsumeResult; + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsExpected())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _topicPartitionQueue.Verify( + tpq => tpq.EnqueueAsync(cr.IsExpected()), + Times.Once()); + } + + private static TopicPartition TopicPartition => + new TopicPartition("test-topic", 1); + + private static TopicPartitionOffset TopicPartitionOffset => + new TopicPartitionOffset(TopicPartition, 1); + + private static ConsumeResult ConsumeResult => + new ConsumeResult + { + Message = new Message + { + Key = "key", + Value = "value" + }, + TopicPartitionOffset = TopicPartitionOffset + }; + } +} diff --git a/src/KafkaConsumer.sln b/src/KafkaConsumer.sln index 038f49a..7dd618e 100644 --- a/src/KafkaConsumer.sln +++ b/src/KafkaConsumer.sln @@ -5,6 +5,8 @@ VisualStudioVersion = 15.0.26124.0 MinimumVisualStudioVersion = 15.0.26124.0 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumer", "KafkaConsumer\KafkaConsumer.csproj", "{D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumer.Tests", "KafkaConsumer.Tests\KafkaConsumer.Tests.csproj", "{72258648-1CD4-4708-9A4A-BBB73FF49AB9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -30,5 +32,17 @@ Global {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x64.Build.0 = Release|Any CPU {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.ActiveCfg = Release|Any CPU {D90F7F7D-ADF4-4BCD-8CA7-69BB6DFB050C}.Release|x86.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x64.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x64.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x86.ActiveCfg = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Debug|x86.Build.0 = Debug|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|Any CPU.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x64.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x64.Build.0 = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x86.ActiveCfg = Release|Any CPU + {72258648-1CD4-4708-9A4A-BBB73FF49AB9}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/KafkaConsumer/Class1.cs b/src/KafkaConsumer/Class1.cs deleted file mode 100644 index 6c3a3af..0000000 --- a/src/KafkaConsumer/Class1.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace KafkaConsumer -{ - public class Class1 - { - } -} diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj index 9f5c4f4..751340c 100644 --- a/src/KafkaConsumer/KafkaConsumer.csproj +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -4,4 +4,8 @@ netstandard2.0 + + + + diff --git a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs new file mode 100644 index 0000000..6848308 --- /dev/null +++ b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using Confluent.Kafka; + +namespace KafkaConsumer.MessageHandler +{ + public interface IMessageHandler + { + Task HandleAsync(ConsumeResult consumeResult); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/IKafkaProcessor.cs b/src/KafkaConsumer/Processor/IKafkaProcessor.cs new file mode 100644 index 0000000..903074e --- /dev/null +++ b/src/KafkaConsumer/Processor/IKafkaProcessor.cs @@ -0,0 +1,10 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaConsumer.Processor +{ + public interface IKafkaProcessor + { + Task ProcessMessagesAsync(CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs new file mode 100644 index 0000000..6731b41 --- /dev/null +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.TopicPartitionQueue; + +namespace KafkaConsumer.Processor +{ + public class KafkaProcessor : IKafkaProcessor + { + private readonly IConsumer _consumer; + private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; + + public KafkaProcessor( + IConsumer consumer, + ITopicPartitionQueueSelector topicPartitionQueueSelector) + { + _consumer = consumer; + _topicPartitionQueueSelector = topicPartitionQueueSelector; + } + + public async Task ProcessMessagesAsync(CancellationToken ct) + { + try + { + while (!ct.IsCancellationRequested) + { + var consumeResult = _consumer.Consume(ct); + + var queue = _topicPartitionQueueSelector.Select(consumeResult.TopicPartition); + + await queue.EnqueueAsync(consumeResult); + } + } + catch (OperationCanceledException) { } + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs new file mode 100644 index 0000000..56aa80e --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using Confluent.Kafka; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueue + { + Task EnqueueAsync(ConsumeResult consumeResult); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs new file mode 100644 index 0000000..8e3baa4 --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs @@ -0,0 +1,9 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueueSelector + { + ITopicPartitionQueue Select(TopicPartition topicPartition); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs new file mode 100644 index 0000000..d50990e --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueue : ITopicPartitionQueue + { + private readonly IMessageHandler _messageHandler; + + public TopicPartitionQueue(IMessageHandler messageHandler) + { + _messageHandler = messageHandler; + } + + public Task EnqueueAsync(ConsumeResult consumeResult) + { + //TODO: use Dataflow blocks + + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs new file mode 100644 index 0000000..825c2fd --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -0,0 +1,12 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueueSelector : ITopicPartitionQueueSelector + { + public ITopicPartitionQueue Select(TopicPartition topicPartition) + { + throw new System.NotImplementedException(); + } + } +} \ No newline at end of file From fa5fa13cb10d05d65ec28da73040c9beaa3f4276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sun, 6 Sep 2020 20:24:47 +0300 Subject: [PATCH 03/22] Removed unused package --- src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj | 1 - 1 file changed, 1 deletion(-) diff --git a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj index b7f73c1..e60d38c 100644 --- a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj +++ b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj @@ -12,7 +12,6 @@ - From fa23bc659d2356be45a4e56e364ffb5dbd8a98f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sun, 6 Sep 2020 21:15:50 +0300 Subject: [PATCH 04/22] TopicPartition queue selection --- src/KafkaConsumer.Tests/DataGenerator.cs | 32 ++++++ .../Processor/KafkaProcessorShould.cs | 19 +--- .../TopicPartitionQueueSelectorShould.cs | 102 ++++++++++++++++++ .../TopicPartitionQueueException.cs | 31 ++++++ .../ITopicPartitionQueueFactory.cs | 7 ++ .../TopicPartitionQueueFactory.cs | 19 ++++ .../TopicPartitionQueueSelector.cs | 36 ++++++- 7 files changed, 227 insertions(+), 19 deletions(-) create mode 100644 src/KafkaConsumer.Tests/DataGenerator.cs create mode 100644 src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs create mode 100644 src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs create mode 100644 src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs diff --git a/src/KafkaConsumer.Tests/DataGenerator.cs b/src/KafkaConsumer.Tests/DataGenerator.cs new file mode 100644 index 0000000..509d339 --- /dev/null +++ b/src/KafkaConsumer.Tests/DataGenerator.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; +using System.Linq; +using Confluent.Kafka; + +namespace KafkaConsumer.Tests +{ + public static class DataGenerator + { + + public static IEnumerable GenerateTopicPartitions(int count) => + Enumerable + .Range(0, count) + .Select(i => new TopicPartition("test-topic", i)); + + public static TopicPartition TopicPartition => + new TopicPartition("test-topic", 1); + + public static TopicPartitionOffset TopicPartitionOffset => + new TopicPartitionOffset(TopicPartition, 1); + + public static ConsumeResult ConsumeResult => + new ConsumeResult + { + Message = new Message + { + Key = "key", + Value = "value" + }, + TopicPartitionOffset = TopicPartitionOffset + }; + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 7bd5057..36c5a8a 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -33,7 +33,7 @@ public KafkaProcessorShould() public async Task EnqueueConsumedResultToTopicPartitionQueue() { // arrange - var cr = ConsumeResult; + var cr = DataGenerator.ConsumeResult; var cts = new CancellationTokenSource(); _consumer @@ -53,22 +53,5 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() tpq => tpq.EnqueueAsync(cr.IsExpected()), Times.Once()); } - - private static TopicPartition TopicPartition => - new TopicPartition("test-topic", 1); - - private static TopicPartitionOffset TopicPartitionOffset => - new TopicPartitionOffset(TopicPartition, 1); - - private static ConsumeResult ConsumeResult => - new ConsumeResult - { - Message = new Message - { - Key = "key", - Value = "value" - }, - TopicPartitionOffset = TopicPartitionOffset - }; } } diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs new file mode 100644 index 0000000..d902229 --- /dev/null +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs @@ -0,0 +1,102 @@ +using KafkaConsumer.Exceptions; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.TopicPartitionQueue +{ + public class TopicPartitionQueueSelectorShould + { + private readonly Mock> _topicPartitionQueueFactory; + private readonly Mock> _topicPartitionQueue; + private readonly TopicPartitionQueueSelector _topicPartitionQueueSelector; + + public TopicPartitionQueueSelectorShould() + { + _topicPartitionQueue = new Mock>(); + _topicPartitionQueueFactory = new Mock>(); + + _topicPartitionQueueSelector = new TopicPartitionQueueSelector( + _topicPartitionQueueFactory.Object); + } + + [Fact] + public void SelectTopicPartitionQueue() + { + // arrange + var tp = DataGenerator.TopicPartition; + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create()) + .Returns(_topicPartitionQueue.Object); + + _topicPartitionQueueSelector.Fill(new[] { tp }); + + // act + var topicParitionQueue = _topicPartitionQueueSelector.Select(tp); + + // assert + topicParitionQueue.IsEquivalentTo(_topicPartitionQueue.Object); + } + + [Fact] + public void FillQueues() + { + // arrange + var tps = DataGenerator.GenerateTopicPartitions(10); + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create()) + .Returns(_topicPartitionQueue.Object); + + // act + _topicPartitionQueueSelector.Fill(tps); + + // assert + foreach (var tp in tps) + { + var tpq = _topicPartitionQueueSelector.Select(tp); + + tpq.IsEquivalentTo(_topicPartitionQueue.Object); + } + } + + [Fact] + public void RemoveQueues() + { + // arrange + var tps = DataGenerator.GenerateTopicPartitions(10); + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create()) + .Returns(_topicPartitionQueue.Object); + + _topicPartitionQueueSelector.Fill(tps); + + // act + _topicPartitionQueueSelector.Remove(tps); + + // assert + foreach (var tp in tps) + { + Assert.Throws( + () => _topicPartitionQueueSelector.Select(tp)); + } + } + + [Fact] + public void ThrowIfTopicPartitionNotFound() + { + // arrange + var tp = DataGenerator.TopicPartition; + + // act + var exception = Assert.Throws( + () => _topicPartitionQueueSelector.Select(tp)); + + // assert + exception.TopicPartition.IsEquivalentTo(tp); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs b/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs new file mode 100644 index 0000000..c96360b --- /dev/null +++ b/src/KafkaConsumer/Exceptions/TopicPartitionQueueException.cs @@ -0,0 +1,31 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.Exceptions +{ + public class TopicPartitionQueueException : System.Exception + { + public TopicPartition TopicPartition { get; set; } + + public TopicPartitionQueueException( + TopicPartition topicPartition) + { + TopicPartition = topicPartition; + } + + public TopicPartitionQueueException( + TopicPartition topicPartition, + string message) + : base(message) + { + TopicPartition = topicPartition; + } + public TopicPartitionQueueException( + TopicPartition topicPartition, + string message, + System.Exception inner) + : base(message, inner) + { + TopicPartition = topicPartition; + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs new file mode 100644 index 0000000..98f193b --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs @@ -0,0 +1,7 @@ +namespace KafkaConsumer.TopicPartitionQueue +{ + public interface ITopicPartitionQueueFactory + { + ITopicPartitionQueue Create(); + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs new file mode 100644 index 0000000..a799dfc --- /dev/null +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs @@ -0,0 +1,19 @@ +using KafkaConsumer.MessageHandler; + +namespace KafkaConsumer.TopicPartitionQueue +{ + public class TopicPartitionQueueFactory : ITopicPartitionQueueFactory + { + private readonly IMessageHandler _messageHandler; + + public TopicPartitionQueueFactory(IMessageHandler messageHandler) + { + _messageHandler = messageHandler; + } + + public ITopicPartitionQueue Create() + { + return new TopicPartitionQueue(_messageHandler); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index 825c2fd..be7be1a 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -1,12 +1,46 @@ +using System.Collections.Generic; using Confluent.Kafka; +using KafkaConsumer.Exceptions; namespace KafkaConsumer.TopicPartitionQueue { public class TopicPartitionQueueSelector : ITopicPartitionQueueSelector { + private readonly Dictionary> _queues; + private readonly ITopicPartitionQueueFactory _topicPartitionQueueFactory; + + public TopicPartitionQueueSelector( + ITopicPartitionQueueFactory topicPartitionQueueFactory) + { + _queues = new Dictionary>(); + _topicPartitionQueueFactory = topicPartitionQueueFactory; + } + public ITopicPartitionQueue Select(TopicPartition topicPartition) { - throw new System.NotImplementedException(); + return _queues.TryGetValue(topicPartition, out var topicPartitionQueue) + ? topicPartitionQueue + : throw new TopicPartitionQueueException( + topicPartition, + $"TopicPartitionQueue not found for {topicPartition}"); + } + + public void Fill(IEnumerable topicPartitions) + { + foreach (var tp in topicPartitions) + { + _queues.Add(tp, _topicPartitionQueueFactory.Create()); + } + } + + public void Remove(IEnumerable topicPartitions) + { + foreach (var tp in topicPartitions) + { + //TODO: finalize each TopicPartitionQueue? + + _queues.Remove(tp); + } } } } \ No newline at end of file From 92d2ac4d23ee0d7f79a7f215522f6cff1ddcf910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sun, 6 Sep 2020 21:58:14 +0300 Subject: [PATCH 05/22] Builder for kafka processor --- .../KafkaConsumer.Tests.csproj | 1 + .../Processor/KafkaProcessorShould.cs | 10 ++- src/KafkaConsumer/KafkaConsumer.csproj | 1 + .../MessageHandler/IMessageHandler.cs | 2 +- src/KafkaConsumer/MessageHandler/Message.cs | 19 +++++ .../Processor/Config/ProcessorConfig.cs | 8 ++ .../Processor/IKafkaProcessor.cs | 3 +- src/KafkaConsumer/Processor/KafkaProcessor.cs | 44 ++++++++-- .../Processor/KafkaProcessorBuilder.cs | 83 +++++++++++++++++++ .../ITopicPartitionQueueSelector.cs | 3 + 10 files changed, 166 insertions(+), 8 deletions(-) create mode 100644 src/KafkaConsumer/MessageHandler/Message.cs create mode 100644 src/KafkaConsumer/Processor/Config/ProcessorConfig.cs create mode 100644 src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs diff --git a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj index e60d38c..470ded8 100644 --- a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj +++ b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 36c5a8a..4b639ab 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -4,8 +4,10 @@ using Confluent.Kafka; using FluentAssertions; using KafkaConsumer.Processor; +using KafkaConsumer.Processor.Config; using KafkaConsumer.Tests.Extensions; using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; using Moq; using Xunit; @@ -17,16 +19,22 @@ public class KafkaProcessorShould private readonly Mock> _topicPartitionQueue; private readonly Mock> _topicPartitionQueueSelector; private readonly KafkaProcessor _kafkaProcessor; + private readonly ProcessorConfig _config; public KafkaProcessorShould() { _topicPartitionQueue = new Mock>(); _consumer = new Mock>(); _topicPartitionQueueSelector = new Mock>(); + _config = new ProcessorConfig + { + Topic = "test-topic" + }; _kafkaProcessor = new KafkaProcessor( _consumer.Object, - _topicPartitionQueueSelector.Object); + _topicPartitionQueueSelector.Object, + Options.Create(_config)); } [Fact] diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj index 751340c..a33c1d7 100644 --- a/src/KafkaConsumer/KafkaConsumer.csproj +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -6,6 +6,7 @@ + diff --git a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs index 6848308..0fb022e 100644 --- a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs +++ b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs @@ -5,6 +5,6 @@ namespace KafkaConsumer.MessageHandler { public interface IMessageHandler { - Task HandleAsync(ConsumeResult consumeResult); + Task HandleAsync(Message message); } } \ No newline at end of file diff --git a/src/KafkaConsumer/MessageHandler/Message.cs b/src/KafkaConsumer/MessageHandler/Message.cs new file mode 100644 index 0000000..82e162b --- /dev/null +++ b/src/KafkaConsumer/MessageHandler/Message.cs @@ -0,0 +1,19 @@ +using Confluent.Kafka; + +namespace KafkaConsumer.MessageHandler +{ + public class Message + { + public ConsumeResult ConsumeResult { get; set; } + + public void StoreOffset() + { + //TODO: call on IConsumer + } + + public void Commit() + { + //TODO: call on IConsumer + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs new file mode 100644 index 0000000..00633ac --- /dev/null +++ b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs @@ -0,0 +1,8 @@ +namespace KafkaConsumer.Processor.Config +{ + public class ProcessorConfig + { + public string Topic { get; set; } + + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/IKafkaProcessor.cs b/src/KafkaConsumer/Processor/IKafkaProcessor.cs index 903074e..cb0e4ac 100644 --- a/src/KafkaConsumer/Processor/IKafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/IKafkaProcessor.cs @@ -1,9 +1,10 @@ +using System; using System.Threading; using System.Threading.Tasks; namespace KafkaConsumer.Processor { - public interface IKafkaProcessor + public interface IKafkaProcessor : IDisposable { Task ProcessMessagesAsync(CancellationToken cancellationToken); } diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index 6731b41..2876281 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -1,9 +1,10 @@ using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; +using KafkaConsumer.Processor.Config; using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; namespace KafkaConsumer.Processor { @@ -11,29 +12,62 @@ public class KafkaProcessor : IKafkaProcessor { private readonly IConsumer _consumer; private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private readonly ProcessorConfig _config; public KafkaProcessor( IConsumer consumer, - ITopicPartitionQueueSelector topicPartitionQueueSelector) + ITopicPartitionQueueSelector topicPartitionQueueSelector, + IOptions config) { _consumer = consumer; _topicPartitionQueueSelector = topicPartitionQueueSelector; + _config = config.Value; } public async Task ProcessMessagesAsync(CancellationToken ct) { + _consumer.Subscribe(_config.Topic); + try { while (!ct.IsCancellationRequested) { - var consumeResult = _consumer.Consume(ct); + var consumeResult = Consume(ct); - var queue = _topicPartitionQueueSelector.Select(consumeResult.TopicPartition); + if (consumeResult != null) + { + var queue = _topicPartitionQueueSelector.Select(consumeResult.TopicPartition); - await queue.EnqueueAsync(consumeResult); + await queue.EnqueueAsync(consumeResult); + } } } catch (OperationCanceledException) { } + finally + { + _consumer.Close(); + } + } + + private ConsumeResult Consume(CancellationToken ct) + { + ConsumeResult cr = null; + + try + { + cr = _consumer.Consume(ct); + } + catch (ConsumeException) + { + //TODO: log and continue - ConsumeExceptions are not fatal + } + + return cr; + } + + public void Dispose() + { + _consumer.Dispose(); } } } \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs new file mode 100644 index 0000000..69c60a6 --- /dev/null +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Confluent.Kafka; +using KafkaConsumer.Processor.Config; +using KafkaConsumer.TopicPartitionQueue; +using Microsoft.Extensions.Options; + +namespace KafkaConsumer.Processor +{ + public class KafkaProcessorBuilder + { + private ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private ConsumerConfig _config; + private string _topic; + + public KafkaProcessorBuilder(TopicPartitionQueueSelector topicPartitionQueueSelector) + { + _topicPartitionQueueSelector = topicPartitionQueueSelector; + } + + public KafkaProcessorBuilder WithConfig(ConsumerConfig config) + { + _config = config; + + return this; + } + + public KafkaProcessorBuilder FromTopic(string topic) + { + _topic = topic; + + return this; + } + public IKafkaProcessor Build() + { + var consumer = new ConsumerBuilder(_config) + .SetPartitionsAssignedHandler(OnPartitionsAssigned) + .SetPartitionsRevokedHandler(OnPartitionsRevoked) + .Build(); + + //TODO: Value/Key deserializers for consumer + + var config = Options.Create(new ProcessorConfig + { + Topic = _topic + }); + + return new KafkaProcessor( + consumer, + _topicPartitionQueueSelector, + config); + } + + private IEnumerable OnPartitionsRevoked( + IConsumer consumer, + List partitions) + { + _topicPartitionQueueSelector.Remove(partitions.Select(p => p.TopicPartition)); + + try + { + consumer.Commit(); + } + catch (KafkaException ex) + when (ex.Error.Code == ErrorCode.Local_NoOffset) + { + // ignore + } + + return partitions; + } + + private IEnumerable OnPartitionsAssigned( + IConsumer consumer, + List partitions) + { + _topicPartitionQueueSelector.Fill(partitions); + + return partitions.Select(p => new TopicPartitionOffset(p, Offset.Stored)); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs index 8e3baa4..f5feb4c 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs @@ -1,3 +1,4 @@ +using System.Collections.Generic; using Confluent.Kafka; namespace KafkaConsumer.TopicPartitionQueue @@ -5,5 +6,7 @@ namespace KafkaConsumer.TopicPartitionQueue public interface ITopicPartitionQueueSelector { ITopicPartitionQueue Select(TopicPartition topicPartition); + void Fill(IEnumerable topicPartitions); + void Remove(IEnumerable topicPartitions); } } \ No newline at end of file From 925879c0270799c51c772b014e11114e97cbdeff Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 7 Sep 2020 22:32:45 +0300 Subject: [PATCH 06/22] Fixed naming --- src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs | 8 ++++---- src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs index 500b5cd..15a65a1 100644 --- a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs +++ b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs @@ -5,16 +5,16 @@ namespace KafkaConsumer.Tests.Extensions { public static class ObjectExtensions { - public static bool IsEquivalentTo(this object obj, object other) + public static bool IsEquivalentTo(this object actual, object expected) { - obj.Should().BeEquivalentTo(other); + actual.Should().BeEquivalentTo(expected); return true; } - public static T IsExpected(this T obj) + public static T IsActual(this T expected) { - return It.Is(other => obj.IsEquivalentTo(other)); + return It.Is(actual => actual.IsEquivalentTo(expected)); } } } \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 4b639ab..9fc6bc2 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -50,7 +50,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() .Returns(cr); _topicPartitionQueueSelector - .Setup(t => t.Select(cr.TopicPartition.IsExpected())) + .Setup(t => t.Select(cr.TopicPartition.IsActual())) .Returns(_topicPartitionQueue.Object); // act @@ -58,7 +58,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() // assert _topicPartitionQueue.Verify( - tpq => tpq.EnqueueAsync(cr.IsExpected()), + tpq => tpq.EnqueueAsync(cr.IsActual()), Times.Once()); } } From 1481b813e670ba9b12f9b06acf5dfaa9de6f771f Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 7 Sep 2020 22:52:48 +0300 Subject: [PATCH 07/22] Use Message instead of ConsumeResult --- src/KafkaConsumer.Tests/DataGenerator.cs | 3 +++ .../Processor/KafkaProcessorShould.cs | 3 ++- src/KafkaConsumer/MessageHandler/IMessageHandler.cs | 1 - src/KafkaConsumer/MessageHandler/Message.cs | 13 +++++++++++-- src/KafkaConsumer/Processor/KafkaProcessor.cs | 4 +++- .../TopicPartitionQueue/ITopicPartitionQueue.cs | 4 ++-- .../TopicPartitionQueue/TopicPartitionQueue.cs | 3 +-- 7 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/KafkaConsumer.Tests/DataGenerator.cs b/src/KafkaConsumer.Tests/DataGenerator.cs index 509d339..2ddcfae 100644 --- a/src/KafkaConsumer.Tests/DataGenerator.cs +++ b/src/KafkaConsumer.Tests/DataGenerator.cs @@ -18,6 +18,9 @@ public static IEnumerable GenerateTopicPartitions(int count) => public static TopicPartitionOffset TopicPartitionOffset => new TopicPartitionOffset(TopicPartition, 1); + public static MessageHandler.Message GenerateMessage(IConsumer consumer) => + new MessageHandler.Message(consumer, ConsumeResult); + public static ConsumeResult ConsumeResult => new ConsumeResult { diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 9fc6bc2..6060dda 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -42,6 +42,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() { // arrange var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); var cts = new CancellationTokenSource(); _consumer @@ -58,7 +59,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() // assert _topicPartitionQueue.Verify( - tpq => tpq.EnqueueAsync(cr.IsActual()), + tpq => tpq.EnqueueAsync(message.IsActual()), Times.Once()); } } diff --git a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs index 0fb022e..c74360a 100644 --- a/src/KafkaConsumer/MessageHandler/IMessageHandler.cs +++ b/src/KafkaConsumer/MessageHandler/IMessageHandler.cs @@ -1,5 +1,4 @@ using System.Threading.Tasks; -using Confluent.Kafka; namespace KafkaConsumer.MessageHandler { diff --git a/src/KafkaConsumer/MessageHandler/Message.cs b/src/KafkaConsumer/MessageHandler/Message.cs index 82e162b..d63d6bb 100644 --- a/src/KafkaConsumer/MessageHandler/Message.cs +++ b/src/KafkaConsumer/MessageHandler/Message.cs @@ -4,16 +4,25 @@ namespace KafkaConsumer.MessageHandler { public class Message { + private readonly IConsumer _consumer; + public ConsumeResult ConsumeResult { get; set; } + public Message(IConsumer consumer, ConsumeResult consumeResult) + { + _consumer = consumer; + + ConsumeResult = consumeResult; + } + public void StoreOffset() { - //TODO: call on IConsumer + _consumer.StoreOffset(ConsumeResult); } public void Commit() { - //TODO: call on IConsumer + _consumer.Commit(ConsumeResult); } } } \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index 2876281..6d3cc10 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -38,7 +38,9 @@ public async Task ProcessMessagesAsync(CancellationToken ct) { var queue = _topicPartitionQueueSelector.Select(consumeResult.TopicPartition); - await queue.EnqueueAsync(consumeResult); + var message = new MessageHandler.Message(_consumer, consumeResult); + + await queue.EnqueueAsync(message); } } } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs index 56aa80e..519b764 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -1,10 +1,10 @@ +using KafkaConsumer.MessageHandler; using System.Threading.Tasks; -using Confluent.Kafka; namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueue { - Task EnqueueAsync(ConsumeResult consumeResult); + Task EnqueueAsync(Message consumeResult); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index d50990e..681a6cd 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using Confluent.Kafka; using KafkaConsumer.MessageHandler; namespace KafkaConsumer.TopicPartitionQueue @@ -14,7 +13,7 @@ public TopicPartitionQueue(IMessageHandler messageHandler) _messageHandler = messageHandler; } - public Task EnqueueAsync(ConsumeResult consumeResult) + public Task EnqueueAsync(Message consumeResult) { //TODO: use Dataflow blocks From 2e87bb6f982f9c4caf46d1d27cc1d03315d0170f Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 7 Sep 2020 23:17:27 +0300 Subject: [PATCH 08/22] Refactor, MessageHandler creation --- .../TopicPartitionQueueSelectorShould.cs | 23 ++++++++++++------ .../Processor/KafkaProcessorBuilder.cs | 24 ++++++++++++++++--- .../ITopicPartitionQueueFactory.cs | 4 +++- .../ITopicPartitionQueueSelector.cs | 3 ++- .../TopicPartitionQueueFactory.cs | 9 +++---- .../TopicPartitionQueueSelector.cs | 11 ++++----- 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs index d902229..ca6a3a7 100644 --- a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs @@ -1,4 +1,5 @@ using KafkaConsumer.Exceptions; +using KafkaConsumer.MessageHandler; using KafkaConsumer.Tests.Extensions; using KafkaConsumer.TopicPartitionQueue; using Moq; @@ -10,10 +11,12 @@ public class TopicPartitionQueueSelectorShould { private readonly Mock> _topicPartitionQueueFactory; private readonly Mock> _topicPartitionQueue; + private readonly Mock> _messageHandler; private readonly TopicPartitionQueueSelector _topicPartitionQueueSelector; public TopicPartitionQueueSelectorShould() { + _messageHandler = new Mock>(); _topicPartitionQueue = new Mock>(); _topicPartitionQueueFactory = new Mock>(); @@ -28,10 +31,10 @@ public void SelectTopicPartitionQueue() var tp = DataGenerator.TopicPartition; _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create()) + .Setup(tpqf => tpqf.Create(It.IsAny>())) .Returns(_topicPartitionQueue.Object); - _topicPartitionQueueSelector.Fill(new[] { tp }); + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); // act var topicParitionQueue = _topicPartitionQueueSelector.Select(tp); @@ -41,17 +44,20 @@ public void SelectTopicPartitionQueue() } [Fact] - public void FillQueues() + public void AddQueues() { // arrange var tps = DataGenerator.GenerateTopicPartitions(10); _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create()) + .Setup(tpqf => tpqf.Create(It.IsAny>())) .Returns(_topicPartitionQueue.Object); // act - _topicPartitionQueueSelector.Fill(tps); + foreach (var tp in tps) + { + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + } // assert foreach (var tp in tps) @@ -69,10 +75,13 @@ public void RemoveQueues() var tps = DataGenerator.GenerateTopicPartitions(10); _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create()) + .Setup(tpqf => tpqf.Create(It.IsAny>())) .Returns(_topicPartitionQueue.Object); - _topicPartitionQueueSelector.Fill(tps); + foreach (var tp in tps) + { + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + } // act _topicPartitionQueueSelector.Remove(tps); diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs index 69c60a6..a830abf 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using Confluent.Kafka; +using KafkaConsumer.MessageHandler; using KafkaConsumer.Processor.Config; using KafkaConsumer.TopicPartitionQueue; using Microsoft.Extensions.Options; @@ -11,12 +12,12 @@ namespace KafkaConsumer.Processor public class KafkaProcessorBuilder { private ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private Func> _handlerFactory; private ConsumerConfig _config; private string _topic; - public KafkaProcessorBuilder(TopicPartitionQueueSelector topicPartitionQueueSelector) + public KafkaProcessorBuilder() { - _topicPartitionQueueSelector = topicPartitionQueueSelector; } public KafkaProcessorBuilder WithConfig(ConsumerConfig config) @@ -32,8 +33,20 @@ public KafkaProcessorBuilder FromTopic(string topic) return this; } + + public KafkaProcessorBuilder WithHandlerFactory(Func> handlerFactory) + { + _handlerFactory = handlerFactory; + + return this; + } + public IKafkaProcessor Build() { + var queueFactory = new TopicPartitionQueueFactory(); + + _topicPartitionQueueSelector = new TopicPartitionQueueSelector(queueFactory); + var consumer = new ConsumerBuilder(_config) .SetPartitionsAssignedHandler(OnPartitionsAssigned) .SetPartitionsRevokedHandler(OnPartitionsRevoked) @@ -75,7 +88,12 @@ private IEnumerable OnPartitionsAssigned( IConsumer consumer, List partitions) { - _topicPartitionQueueSelector.Fill(partitions); + foreach (var partition in partitions) + { + var messageHandler = _handlerFactory.Invoke(partition); + + _topicPartitionQueueSelector.AddQueue(partition, messageHandler); + } return partitions.Select(p => new TopicPartitionOffset(p, Offset.Stored)); } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs index 98f193b..4117aae 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs @@ -1,7 +1,9 @@ +using KafkaConsumer.MessageHandler; + namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueueFactory { - ITopicPartitionQueue Create(); + ITopicPartitionQueue Create(IMessageHandler messageHandler); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs index f5feb4c..98e9aa4 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueSelector.cs @@ -1,12 +1,13 @@ using System.Collections.Generic; using Confluent.Kafka; +using KafkaConsumer.MessageHandler; namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueueSelector { ITopicPartitionQueue Select(TopicPartition topicPartition); - void Fill(IEnumerable topicPartitions); + void AddQueue(TopicPartition topicPartition, IMessageHandler messageHandler); void Remove(IEnumerable topicPartitions); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs index a799dfc..a3537f5 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs @@ -4,16 +4,13 @@ namespace KafkaConsumer.TopicPartitionQueue { public class TopicPartitionQueueFactory : ITopicPartitionQueueFactory { - private readonly IMessageHandler _messageHandler; - - public TopicPartitionQueueFactory(IMessageHandler messageHandler) + public TopicPartitionQueueFactory() { - _messageHandler = messageHandler; } - public ITopicPartitionQueue Create() + public ITopicPartitionQueue Create(IMessageHandler messageHandler) { - return new TopicPartitionQueue(_messageHandler); + return new TopicPartitionQueue(messageHandler); } } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index be7be1a..3bbc9d4 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -1,6 +1,7 @@ using System.Collections.Generic; using Confluent.Kafka; using KafkaConsumer.Exceptions; +using KafkaConsumer.MessageHandler; namespace KafkaConsumer.TopicPartitionQueue { @@ -9,8 +10,7 @@ public class TopicPartitionQueueSelector : ITopicPartitionQueueSel private readonly Dictionary> _queues; private readonly ITopicPartitionQueueFactory _topicPartitionQueueFactory; - public TopicPartitionQueueSelector( - ITopicPartitionQueueFactory topicPartitionQueueFactory) + public TopicPartitionQueueSelector(ITopicPartitionQueueFactory topicPartitionQueueFactory) { _queues = new Dictionary>(); _topicPartitionQueueFactory = topicPartitionQueueFactory; @@ -25,12 +25,9 @@ public ITopicPartitionQueue Select(TopicPartition topicPartition) $"TopicPartitionQueue not found for {topicPartition}"); } - public void Fill(IEnumerable topicPartitions) + public void AddQueue(TopicPartition topicPartition, IMessageHandler messageHandler) { - foreach (var tp in topicPartitions) - { - _queues.Add(tp, _topicPartitionQueueFactory.Create()); - } + _queues.Add(topicPartition, _topicPartitionQueueFactory.Create(messageHandler)); } public void Remove(IEnumerable topicPartitions) From cc4afacba53cdee6939d6c041fe9f66cd32b75ed Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 7 Sep 2020 23:48:30 +0300 Subject: [PATCH 09/22] KafkaProcessorBuilder updated --- .../Processor/KafkaProcessorBuilderShould.cs | 128 ++++++++++++++++++ .../Processor/KafkaProcessorBuilder.cs | 64 +++++++-- 2 files changed, 182 insertions(+), 10 deletions(-) create mode 100644 src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs new file mode 100644 index 0000000..bd41b4b --- /dev/null +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs @@ -0,0 +1,128 @@ +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Processor; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using System; +using Xunit; + +namespace KafkaConsumer.Tests.Processor +{ + public class KafkaProcessorBuilderShould + { + private readonly Mock> _topicPartitionQueueSelector; + private readonly Mock> _messageHandler; + private readonly KafkaProcessorBuilder _kafkaProcessorBuilder; + + public KafkaProcessorBuilderShould() + { + _messageHandler = new Mock>(); + _topicPartitionQueueSelector = new Mock>(); + + _kafkaProcessorBuilder = new KafkaProcessorBuilder(_topicPartitionQueueSelector.Object); + } + + [Fact] + public void ThrowIfConsumerConfigNotSet() + { + // arrange + var topic = "topic"; + + _kafkaProcessorBuilder + .FromTopic(topic) + .WithHandlerFactory(_ => _messageHandler.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'consumerConfig' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfTopicNotSet() + { + // arrange + var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + + _kafkaProcessorBuilder + .WithConfig(consumerConfig) + .WithHandlerFactory(_ => _messageHandler.Object); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'topic' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfHandlerFactoryNotSet() + { + // arrange + var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + var topic = "topic"; + + _kafkaProcessorBuilder + .WithConfig(consumerConfig) + .FromTopic(topic); + + // act + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.Build()); + + // assert + Assert.Equal("'handlerFactory' must be set!", exception.Message); + } + + [Fact] + public void ThrowIfConsumerConfigAlreadySet() + { + // arrange + var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + + _kafkaProcessorBuilder.WithConfig(consumerConfig); + + // act + + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithConfig(consumerConfig)); + + // assert + Assert.Equal("'consumerConfig' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfTopicAlreadySet() + { + // arrange + var topic = "topic"; + + _kafkaProcessorBuilder.FromTopic(topic); + + // act + + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.FromTopic(topic)); + + // assert + Assert.Equal("'topic' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfHandlerFactoryAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object); + + // act + + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object)); + + // assert + Assert.Equal("'handlerFactory' was already set!", exception.Message); + } + } +} diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs index a830abf..66b22f8 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -11,31 +11,51 @@ namespace KafkaConsumer.Processor { public class KafkaProcessorBuilder { - private ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; private Func> _handlerFactory; - private ConsumerConfig _config; + private ConsumerConfig _consumerConfig; private string _topic; - public KafkaProcessorBuilder() + public KafkaProcessorBuilder(ITopicPartitionQueueSelector topicPartitionQueueSelector) { + _topicPartitionQueueSelector = topicPartitionQueueSelector; } - public KafkaProcessorBuilder WithConfig(ConsumerConfig config) + public static KafkaProcessorBuilder CreateDefault() { - _config = config; + var queueFactory = new TopicPartitionQueueFactory(); + + var topicPartitionQueueSelector = new TopicPartitionQueueSelector(queueFactory); + + return new KafkaProcessorBuilder(topicPartitionQueueSelector); + } + + public KafkaProcessorBuilder WithConfig(ConsumerConfig consumerConfig) + { + if (_consumerConfig != null) + throw new InvalidOperationException("'consumerConfig' was already set!"); + + _consumerConfig = consumerConfig; return this; } public KafkaProcessorBuilder FromTopic(string topic) { + if (_topic != null) + throw new InvalidOperationException("'topic' was already set!"); + _topic = topic; return this; } - public KafkaProcessorBuilder WithHandlerFactory(Func> handlerFactory) + public KafkaProcessorBuilder WithHandlerFactory( + Func> handlerFactory) { + if (_handlerFactory != null) + throw new InvalidOperationException("'handlerFactory' was already set!"); + _handlerFactory = handlerFactory; return this; @@ -43,17 +63,16 @@ public KafkaProcessorBuilder WithHandlerFactory(Func Build() { - var queueFactory = new TopicPartitionQueueFactory(); - - _topicPartitionQueueSelector = new TopicPartitionQueueSelector(queueFactory); + CheckIfConfigured(); - var consumer = new ConsumerBuilder(_config) + var consumer = new ConsumerBuilder(_consumerConfig) .SetPartitionsAssignedHandler(OnPartitionsAssigned) .SetPartitionsRevokedHandler(OnPartitionsRevoked) .Build(); //TODO: Value/Key deserializers for consumer + //TODO: expose WithProcessorConfig() method var config = Options.Create(new ProcessorConfig { Topic = _topic @@ -97,5 +116,30 @@ private IEnumerable OnPartitionsAssigned( return partitions.Select(p => new TopicPartitionOffset(p, Offset.Stored)); } + + private void CheckIfConfigured() + { + CheckIfConsumerConfigSet(); + CheckIfHandlerFactorySet(); + CheckIfTopicSet(); + } + + private void CheckIfConsumerConfigSet() + { + if (_consumerConfig == null) + throw new InvalidOperationException("'consumerConfig' must be set!"); + } + + private void CheckIfHandlerFactorySet() + { + if (_handlerFactory == null) + throw new InvalidOperationException("'handlerFactory' must be set!"); + } + + private void CheckIfTopicSet() + { + if (string.IsNullOrEmpty(_topic)) + throw new InvalidOperationException("'topic' must be set!"); + } } } \ No newline at end of file From b36d3e33b1dcf10a65652565503153e1551a4465 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 10 Sep 2020 22:55:49 +0300 Subject: [PATCH 10/22] Version updates --- src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj | 2 +- src/KafkaConsumer/KafkaConsumer.csproj | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj index 470ded8..b8f5f80 100644 --- a/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj +++ b/src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj index a33c1d7..b4962c4 100644 --- a/src/KafkaConsumer/KafkaConsumer.csproj +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -1,12 +1,12 @@ - netstandard2.0 + netstandard2.1 - + From c5e12a3519fd64b04665b53e651f77340b704fdb Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 10 Sep 2020 23:17:18 +0300 Subject: [PATCH 11/22] TopicPartitionQueues with TPL Dataflow (WIP) --- .../Processor/KafkaProcessorShould.cs | 2 +- src/KafkaConsumer/KafkaConsumer.csproj | 3 +- src/KafkaConsumer/Processor/KafkaProcessor.cs | 7 +++- .../ITopicPartitionQueue.cs | 4 +- .../TopicPartitionQueue.cs | 42 +++++++++++++++++-- .../TopicPartitionQueueSelector.cs | 6 ++- 6 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 6060dda..561fcab 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -59,7 +59,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() // assert _topicPartitionQueue.Verify( - tpq => tpq.EnqueueAsync(message.IsActual()), + tpq => tpq.TryEnqueueAsync(message.IsActual()), Times.Once()); } } diff --git a/src/KafkaConsumer/KafkaConsumer.csproj b/src/KafkaConsumer/KafkaConsumer.csproj index b4962c4..3f8142a 100644 --- a/src/KafkaConsumer/KafkaConsumer.csproj +++ b/src/KafkaConsumer/KafkaConsumer.csproj @@ -1,4 +1,4 @@ - + netstandard2.1 @@ -7,6 +7,7 @@ + diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index 6d3cc10..6b8e7ed 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -40,7 +40,12 @@ public async Task ProcessMessagesAsync(CancellationToken ct) var message = new MessageHandler.Message(_consumer, consumeResult); - await queue.EnqueueAsync(message); + var isEnqueued = await queue.TryEnqueueAsync(message); + + if (!isEnqueued) + { + await queue.CompleteAsync(); + } } } } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs index 519b764..3cb2689 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -5,6 +5,8 @@ namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueue { - Task EnqueueAsync(Message consumeResult); + Task TryEnqueueAsync(Message consumeResult); + + Task CompleteAsync(); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index 681a6cd..6164019 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -1,5 +1,5 @@ -using System; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using KafkaConsumer.MessageHandler; namespace KafkaConsumer.TopicPartitionQueue @@ -8,16 +8,50 @@ public class TopicPartitionQueue : ITopicPartitionQueue _messageHandler; + private readonly BufferBlock> _bufferBlock; + private readonly ActionBlock> _actionBlock; + public TopicPartitionQueue(IMessageHandler messageHandler) { _messageHandler = messageHandler; + + _bufferBlock = new BufferBlock>( + new DataflowBlockOptions + { + BoundedCapacity = 1000 + }); + + _actionBlock = new ActionBlock>( + _messageHandler.HandleAsync, + new ExecutionDataflowBlockOptions + { + BoundedCapacity = 1 + }); + + _bufferBlock.LinkTo(_actionBlock); + + PropagateErrors(_actionBlock, _bufferBlock); } - public Task EnqueueAsync(Message consumeResult) + public async Task CompleteAsync() { - //TODO: use Dataflow blocks + _actionBlock.Complete(); - throw new NotImplementedException(); + await _actionBlock.Completion; + } + + public async Task TryEnqueueAsync(Message consumeResult) => + await _bufferBlock.SendAsync(consumeResult); + + private static void PropagateErrors(IDataflowBlock from, IDataflowBlock to) + { + from.Completion.ContinueWith((t) => + { + if (t.IsFaulted) + { + to.Fault(from.Completion.Exception.InnerException); + } + }); } } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index 3bbc9d4..ba5cfa0 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -1,4 +1,6 @@ using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; using Confluent.Kafka; using KafkaConsumer.Exceptions; using KafkaConsumer.MessageHandler; @@ -32,10 +34,10 @@ public void AddQueue(TopicPartition topicPartition, IMessageHandler topicPartitions) { + Task.WaitAll(_queues.Select(q => q.Value.CompleteAsync()).ToArray()); + foreach (var tp in topicPartitions) { - //TODO: finalize each TopicPartitionQueue? - _queues.Remove(tp); } } From e64f37bf2077208fc09940d1256674a608f2a830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sat, 12 Sep 2020 19:06:43 +0300 Subject: [PATCH 12/22] Abort/Complete topic partition queues. --- src/KafkaConsumer/Processor/KafkaProcessor.cs | 2 +- .../ITopicPartitionQueue.cs | 4 ++-- .../TopicPartitionQueue.cs | 22 ++++++++++++++----- .../TopicPartitionQueueFactory.cs | 4 ---- .../TopicPartitionQueueSelector.cs | 1 + 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index 6b8e7ed..efcbaee 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -44,7 +44,7 @@ public async Task ProcessMessagesAsync(CancellationToken ct) if (!isEnqueued) { - await queue.CompleteAsync(); + await queue.AbortAsync(); } } } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs index 3cb2689..7d21aad 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -5,8 +5,8 @@ namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueue { - Task TryEnqueueAsync(Message consumeResult); - + Task TryEnqueueAsync(Message message); Task CompleteAsync(); + Task AbortAsync(); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index 6164019..826618c 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -28,28 +28,38 @@ public TopicPartitionQueue(IMessageHandler messageHandler) BoundedCapacity = 1 }); - _bufferBlock.LinkTo(_actionBlock); + _bufferBlock.LinkTo(_actionBlock, new DataflowLinkOptions + { + PropagateCompletion = true + }); PropagateErrors(_actionBlock, _bufferBlock); } public async Task CompleteAsync() + { + _bufferBlock.Complete(); + + await _bufferBlock.Completion; + } + + public async Task AbortAsync() { _actionBlock.Complete(); await _actionBlock.Completion; } - public async Task TryEnqueueAsync(Message consumeResult) => - await _bufferBlock.SendAsync(consumeResult); + public async Task TryEnqueueAsync(Message message) => + await _bufferBlock.SendAsync(message); private static void PropagateErrors(IDataflowBlock from, IDataflowBlock to) { - from.Completion.ContinueWith((t) => + from.Completion.ContinueWith(task => { - if (t.IsFaulted) + if (task.IsFaulted) { - to.Fault(from.Completion.Exception.InnerException); + to.Fault(task.Exception.InnerException); } }); } diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs index a3537f5..349ad34 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs @@ -4,10 +4,6 @@ namespace KafkaConsumer.TopicPartitionQueue { public class TopicPartitionQueueFactory : ITopicPartitionQueueFactory { - public TopicPartitionQueueFactory() - { - } - public ITopicPartitionQueue Create(IMessageHandler messageHandler) { return new TopicPartitionQueue(messageHandler); diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index ba5cfa0..a7a39f1 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -34,6 +34,7 @@ public void AddQueue(TopicPartition topicPartition, IMessageHandler topicPartitions) { + //TODO: Abort vs Complete Task.WaitAll(_queues.Select(q => q.Value.CompleteAsync()).ToArray()); foreach (var tp in topicPartitions) From 35dda3e5e4ba8f85342aa251500137627682e8ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sat, 12 Sep 2020 19:06:57 +0300 Subject: [PATCH 13/22] TopicPartitionQueue tests --- src/KafkaConsumer.Tests/DataGenerator.cs | 21 +++- .../MessageHandlerVerifier.cs | 54 +++++++++ .../TopicPartitionQueueShould.cs | 110 ++++++++++++++++++ 3 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 src/KafkaConsumer.Tests/MessageHandlerVerifier.cs create mode 100644 src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs diff --git a/src/KafkaConsumer.Tests/DataGenerator.cs b/src/KafkaConsumer.Tests/DataGenerator.cs index 2ddcfae..d0a4eeb 100644 --- a/src/KafkaConsumer.Tests/DataGenerator.cs +++ b/src/KafkaConsumer.Tests/DataGenerator.cs @@ -6,7 +6,6 @@ namespace KafkaConsumer.Tests { public static class DataGenerator { - public static IEnumerable GenerateTopicPartitions(int count) => Enumerable .Range(0, count) @@ -18,9 +17,16 @@ public static IEnumerable GenerateTopicPartitions(int count) => public static TopicPartitionOffset TopicPartitionOffset => new TopicPartitionOffset(TopicPartition, 1); - public static MessageHandler.Message GenerateMessage(IConsumer consumer) => + public static MessageHandler.Message GenerateMessage( + IConsumer consumer) => new MessageHandler.Message(consumer, ConsumeResult); + public static IEnumerable> GenerateMessages( + IConsumer consumer, + int count) => + GenerateConsumeResults(count) + .Select(cr => new MessageHandler.Message(consumer, cr)); + public static ConsumeResult ConsumeResult => new ConsumeResult { @@ -31,5 +37,16 @@ public static MessageHandler.Message GenerateMessage(IConsumer> GenerateConsumeResults(int count) => + Enumerable.Range(0, count).Select(i => new ConsumeResult + { + Message = new Message + { + Key = $"key_{i}", + Value = $"value_{i}" + }, + TopicPartitionOffset = new TopicPartitionOffset("test-topic", 0, i) + }); } } \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs new file mode 100644 index 0000000..77e9e8f --- /dev/null +++ b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs @@ -0,0 +1,54 @@ +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; +using KafkaConsumer.MessageHandler; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests +{ + + public class MessageHandlerVerifier + { + private int _index; + private List> _messages; + + public MessageHandlerVerifier(List> messages) + { + _messages = messages; + } + + public static void Verify( + Mock> messageHandler, + List> messages) + { + if(!messages.Any()) + { + messageHandler.Verify( + mh => mh.HandleAsync(It.IsAny>()), + Times.Never()); + + return; + } + + var collectionVerifier = new MessageHandlerVerifier(messages); + + messageHandler.Verify(mh => mh.HandleAsync( + It.Is>(msg => collectionVerifier.Validate(msg)))); + + messageHandler.VerifyNoOtherCalls(); + + Assert.Equal(messages.Count, collectionVerifier._index); + } + + + private bool Validate(Message message) + { + _messages[_index].Should().BeEquivalentTo(message); + + _index++; + + return true; + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs new file mode 100644 index 0000000..3f1c62f --- /dev/null +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs @@ -0,0 +1,110 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; +using KafkaConsumer.Tests.Extensions; +using KafkaConsumer.TopicPartitionQueue; +using Moq; +using Xunit; + +namespace KafkaConsumer.Tests.TopicPartitionQueue +{ + public class TopicPartitionQueueShould + { + private readonly Mock> _messageHandler; + private readonly TopicPartitionQueue _topicPartitionQueue; + private readonly Mock> _consumer; + + public TopicPartitionQueueShould() + { + _consumer = new Mock>(); + _messageHandler = new Mock>(); + + _topicPartitionQueue = new TopicPartitionQueue(_messageHandler.Object); + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(500)] + [InlineData(1001)] + public async Task EnqueueAndInvokeMessageHandler(int count) + { + // arrange + var messages = DataGenerator.GenerateMessages(_consumer.Object, count).ToList(); + + _messageHandler + .Setup(m => m.HandleAsync(It.IsAny>())) + .Returns(Task.Delay(10)); + + // act + foreach (var message in messages) + { + var isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); + + Assert.True(isEnqueued); + } + + await _topicPartitionQueue.CompleteAsync(); + + // assert + + MessageHandlerVerifier.Verify(_messageHandler, messages); + } + + [Fact] + public async Task PropagateErrorsFromMessageHandlerOnComplete() + { + // arrange + var message = DataGenerator.GenerateMessage(_consumer.Object); + + _messageHandler + .Setup(m => m.HandleAsync(message.IsActual())) + .Throws(); + + // act + bool isEnqueued; + do + { + isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); + } + while(isEnqueued); + + // assert + await Assert.ThrowsAsync( + () => _topicPartitionQueue.CompleteAsync()); + + _messageHandler.Verify( + m => m.HandleAsync(message.IsActual()), + Times.Once()); + } + + [Fact] + public async Task PropagateErrorsFromMessageHandlerOnAbort() + { + // arrange + var message = DataGenerator.GenerateMessage(_consumer.Object); + + _messageHandler + .Setup(m => m.HandleAsync(message.IsActual())) + .Throws(); + + // act + bool isEnqueued; + do + { + isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); + } + while(isEnqueued); + + // assert + await Assert.ThrowsAsync( + () => _topicPartitionQueue.AbortAsync()); + + _messageHandler.Verify( + m => m.HandleAsync(message.IsActual()), + Times.Once()); + } + } +} \ No newline at end of file From 8a2bd17ba267a03cf7dea83a5e89987ab475ced4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sat, 12 Sep 2020 19:31:07 +0300 Subject: [PATCH 14/22] Configurable queue capacity --- .../TopicPartitionQueueSelectorShould.cs | 17 ++++++++++++----- .../TopicPartitionQueueShould.cs | 2 +- .../Processor/KafkaProcessorBuilder.cs | 4 +++- .../ITopicPartitionQueueFactory.cs | 4 +++- .../TopicPartitionQueue/TopicPartitionQueue.cs | 4 ++-- .../TopicPartitionQueueFactory.cs | 6 ++++-- .../TopicPartitionQueueSelector.cs | 9 +++++++-- 7 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs index ca6a3a7..2b49372 100644 --- a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs @@ -21,7 +21,8 @@ public TopicPartitionQueueSelectorShould() _topicPartitionQueueFactory = new Mock>(); _topicPartitionQueueSelector = new TopicPartitionQueueSelector( - _topicPartitionQueueFactory.Object); + _topicPartitionQueueFactory.Object, + 1000); } [Fact] @@ -31,8 +32,10 @@ public void SelectTopicPartitionQueue() var tp = DataGenerator.TopicPartition; _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create(It.IsAny>())) - .Returns(_topicPartitionQueue.Object); + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); @@ -50,7 +53,9 @@ public void AddQueues() var tps = DataGenerator.GenerateTopicPartitions(10); _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create(It.IsAny>())) + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) .Returns(_topicPartitionQueue.Object); // act @@ -75,7 +80,9 @@ public void RemoveQueues() var tps = DataGenerator.GenerateTopicPartitions(10); _topicPartitionQueueFactory - .Setup(tpqf => tpqf.Create(It.IsAny>())) + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) .Returns(_topicPartitionQueue.Object); foreach (var tp in tps) diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs index 3f1c62f..da2a0ff 100644 --- a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs @@ -21,7 +21,7 @@ public TopicPartitionQueueShould() _consumer = new Mock>(); _messageHandler = new Mock>(); - _topicPartitionQueue = new TopicPartitionQueue(_messageHandler.Object); + _topicPartitionQueue = new TopicPartitionQueue(_messageHandler.Object, 1000); } [Theory] diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs index 66b22f8..4bf66ab 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -25,7 +25,9 @@ public static KafkaProcessorBuilder CreateDefault() { var queueFactory = new TopicPartitionQueueFactory(); - var topicPartitionQueueSelector = new TopicPartitionQueueSelector(queueFactory); + var topicPartitionQueueSelector = new TopicPartitionQueueSelector( + queueFactory, + 1000); return new KafkaProcessorBuilder(topicPartitionQueueSelector); } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs index 4117aae..b7144b1 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueueFactory.cs @@ -4,6 +4,8 @@ namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueueFactory { - ITopicPartitionQueue Create(IMessageHandler messageHandler); + ITopicPartitionQueue Create( + IMessageHandler messageHandler, + int queueCapacity); } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index 826618c..fc28b1b 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -11,14 +11,14 @@ public class TopicPartitionQueue : ITopicPartitionQueue> _bufferBlock; private readonly ActionBlock> _actionBlock; - public TopicPartitionQueue(IMessageHandler messageHandler) + public TopicPartitionQueue(IMessageHandler messageHandler, int queueCapacity) { _messageHandler = messageHandler; _bufferBlock = new BufferBlock>( new DataflowBlockOptions { - BoundedCapacity = 1000 + BoundedCapacity = queueCapacity }); _actionBlock = new ActionBlock>( diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs index 349ad34..60f1c2d 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueFactory.cs @@ -4,9 +4,11 @@ namespace KafkaConsumer.TopicPartitionQueue { public class TopicPartitionQueueFactory : ITopicPartitionQueueFactory { - public ITopicPartitionQueue Create(IMessageHandler messageHandler) + public ITopicPartitionQueue Create( + IMessageHandler messageHandler, + int queueCapacity) { - return new TopicPartitionQueue(messageHandler); + return new TopicPartitionQueue(messageHandler, queueCapacity); } } } \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index a7a39f1..0e9cc6b 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -11,10 +11,15 @@ public class TopicPartitionQueueSelector : ITopicPartitionQueueSel { private readonly Dictionary> _queues; private readonly ITopicPartitionQueueFactory _topicPartitionQueueFactory; + private readonly int _queueCapacity; - public TopicPartitionQueueSelector(ITopicPartitionQueueFactory topicPartitionQueueFactory) + public TopicPartitionQueueSelector( + ITopicPartitionQueueFactory topicPartitionQueueFactory, + int queueCapacity) { _queues = new Dictionary>(); + + _queueCapacity = queueCapacity; _topicPartitionQueueFactory = topicPartitionQueueFactory; } @@ -29,7 +34,7 @@ public ITopicPartitionQueue Select(TopicPartition topicPartition) public void AddQueue(TopicPartition topicPartition, IMessageHandler messageHandler) { - _queues.Add(topicPartition, _topicPartitionQueueFactory.Create(messageHandler)); + _queues.Add(topicPartition, _topicPartitionQueueFactory.Create(messageHandler, _queueCapacity)); } public void Remove(IEnumerable topicPartitions) From 495d5febb3dbd1c000edaf6911d7ae48951ed82c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sat, 12 Sep 2020 19:40:42 +0300 Subject: [PATCH 15/22] Dataflow extensions --- .../Extensions/DataflowExtensions.cs | 18 ++++++++++++++++++ .../TopicPartitionQueue/TopicPartitionQueue.cs | 14 ++------------ 2 files changed, 20 insertions(+), 12 deletions(-) create mode 100644 src/KafkaConsumer/Extensions/DataflowExtensions.cs diff --git a/src/KafkaConsumer/Extensions/DataflowExtensions.cs b/src/KafkaConsumer/Extensions/DataflowExtensions.cs new file mode 100644 index 0000000..b814730 --- /dev/null +++ b/src/KafkaConsumer/Extensions/DataflowExtensions.cs @@ -0,0 +1,18 @@ +using System.Threading.Tasks.Dataflow; + +namespace KafkaConsumer.Extensions +{ + public static class DataflowExtensions + { + public static void PropagateErrorsTo(this IDataflowBlock from, IDataflowBlock to) + { + from.Completion.ContinueWith(task => + { + if (task.IsFaulted) + { + to.Fault(task.Exception.InnerException); + } + }); + } + } +} \ No newline at end of file diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index fc28b1b..6346576 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -1,5 +1,6 @@ using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using KafkaConsumer.Extensions; using KafkaConsumer.MessageHandler; namespace KafkaConsumer.TopicPartitionQueue @@ -33,7 +34,7 @@ public TopicPartitionQueue(IMessageHandler messageHandler, int que PropagateCompletion = true }); - PropagateErrors(_actionBlock, _bufferBlock); + _actionBlock.PropagateErrorsTo(_bufferBlock); } public async Task CompleteAsync() @@ -52,16 +53,5 @@ public async Task AbortAsync() public async Task TryEnqueueAsync(Message message) => await _bufferBlock.SendAsync(message); - - private static void PropagateErrors(IDataflowBlock from, IDataflowBlock to) - { - from.Completion.ContinueWith(task => - { - if (task.IsFaulted) - { - to.Fault(task.Exception.InnerException); - } - }); - } } } \ No newline at end of file From 633cf322eccf9072e16aa88ff5de223095cc6e4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Kapl=C5=ABnas?= Date: Sat, 12 Sep 2020 19:50:58 +0300 Subject: [PATCH 16/22] Fixed dataflow block completion --- src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index 6346576..5d45b6c 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -41,7 +41,7 @@ public async Task CompleteAsync() { _bufferBlock.Complete(); - await _bufferBlock.Completion; + await _actionBlock.Completion; } public async Task AbortAsync() From ced466ed139e547d05782deeea2b0129a8e7d6e8 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 14 Sep 2020 21:53:23 +0300 Subject: [PATCH 17/22] KafkaProcessor tests --- .../Processor/KafkaProcessorShould.cs | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 561fcab..a10f666 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -62,5 +62,75 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() tpq => tpq.TryEnqueueAsync(message.IsActual()), Times.Once()); } + + [Fact] + public async Task CloseConsumerAfterStoppingToProcessMessages() + { + // arrange + var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsActual())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _consumer.Verify( + c => c.Close(), + Times.Once()); + } + + [Fact] + public async Task CloseConsumerIfExceptionWasThrown() + { + // arrange + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Throws(); + + // act + await Assert.ThrowsAsync( + () => _kafkaProcessor.ProcessMessagesAsync(CancellationToken.None)); + + // assert + _consumer.Verify( + c => c.Close(), + Times.Once()); + } + + [Fact] + public async Task SubscribeToTopic() + { + // arrange + var cr = DataGenerator.ConsumeResult; + var message = DataGenerator.GenerateMessage(_consumer.Object); + var cts = new CancellationTokenSource(); + + _consumer + .Setup(c => c.Consume(It.IsAny())) + .Callback(_ => cts.Cancel()) + .Returns(cr); + + _topicPartitionQueueSelector + .Setup(t => t.Select(cr.TopicPartition.IsActual())) + .Returns(_topicPartitionQueue.Object); + + // act + await _kafkaProcessor.ProcessMessagesAsync(cts.Token); + + // assert + _consumer.Verify( + c => c.Subscribe(_config.Topic), + Times.Once()); + } } } From 1fb22fb1da8d947ebdee80544f4c6fa01c4d5a4d Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 14 Sep 2020 22:07:57 +0300 Subject: [PATCH 18/22] Key/Value deserializers --- .../Processor/KafkaProcessorBuilderShould.cs | 43 +++++++++++++-- .../Processor/KafkaProcessorShould.cs | 1 - .../Processor/KafkaProcessorBuilder.cs | 53 ++++++++++++++++--- 3 files changed, 84 insertions(+), 13 deletions(-) diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs index bd41b4b..f23e4be 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs @@ -1,4 +1,5 @@ -using KafkaConsumer.MessageHandler; +using Confluent.Kafka; +using KafkaConsumer.MessageHandler; using KafkaConsumer.Processor; using KafkaConsumer.TopicPartitionQueue; using Moq; @@ -12,9 +13,13 @@ public class KafkaProcessorBuilderShould private readonly Mock> _topicPartitionQueueSelector; private readonly Mock> _messageHandler; private readonly KafkaProcessorBuilder _kafkaProcessorBuilder; + private readonly Mock> _keyDeserializer; + private readonly Mock> _valueDeserializer; public KafkaProcessorBuilderShould() { + _keyDeserializer = new Mock>(); + _valueDeserializer = new Mock>(); _messageHandler = new Mock>(); _topicPartitionQueueSelector = new Mock>(); @@ -43,7 +48,7 @@ public void ThrowIfConsumerConfigNotSet() public void ThrowIfTopicNotSet() { // arrange - var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + var consumerConfig = new ConsumerConfig(); _kafkaProcessorBuilder .WithConfig(consumerConfig) @@ -61,7 +66,7 @@ public void ThrowIfTopicNotSet() public void ThrowIfHandlerFactoryNotSet() { // arrange - var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + var consumerConfig = new ConsumerConfig(); var topic = "topic"; _kafkaProcessorBuilder @@ -80,7 +85,7 @@ public void ThrowIfHandlerFactoryNotSet() public void ThrowIfConsumerConfigAlreadySet() { // arrange - var consumerConfig = new Confluent.Kafka.ConsumerConfig(); + var consumerConfig = new ConsumerConfig(); _kafkaProcessorBuilder.WithConfig(consumerConfig); @@ -124,5 +129,35 @@ public void ThrowIfHandlerFactoryAlreadySet() // assert Assert.Equal("'handlerFactory' was already set!", exception.Message); } + + [Fact] + public void ThrowIfKeyDeserializerAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object); + + // act + + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object)); + + // assert + Assert.Equal("'keyDeserializer' was already set!", exception.Message); + } + + [Fact] + public void ThrowIfValueDeserializerAlreadySet() + { + // arrange + _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object); + + // act + + var exception = Assert.Throws( + () => _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object)); + + // assert + Assert.Equal("'valueDeserializer' was already set!", exception.Message); + } } } diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index a10f666..2c82b41 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -2,7 +2,6 @@ using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; -using FluentAssertions; using KafkaConsumer.Processor; using KafkaConsumer.Processor.Config; using KafkaConsumer.Tests.Extensions; diff --git a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs index 4bf66ab..3ef30de 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessorBuilder.cs @@ -12,9 +12,12 @@ namespace KafkaConsumer.Processor public class KafkaProcessorBuilder { private readonly ITopicPartitionQueueSelector _topicPartitionQueueSelector; + private Func> _handlerFactory; private ConsumerConfig _consumerConfig; private string _topic; + private IDeserializer _keyDeserializer; + private IDeserializer _valueDeserializer; public KafkaProcessorBuilder(ITopicPartitionQueueSelector topicPartitionQueueSelector) { @@ -63,19 +66,34 @@ public KafkaProcessorBuilder WithHandlerFactory( return this; } + public KafkaProcessorBuilder WithKeyDeserializer(IDeserializer keyDeserializer) + { + if (_keyDeserializer != null) + throw new InvalidOperationException("'keyDeserializer' was already set!"); + + _keyDeserializer = keyDeserializer; + + return this; + } + + public KafkaProcessorBuilder WithValueDeserializer(IDeserializer valueDeserializer) + { + if (_valueDeserializer != null) + throw new InvalidOperationException("'valueDeserializer' was already set!"); + + _valueDeserializer = valueDeserializer; + + return this; + } + public IKafkaProcessor Build() { CheckIfConfigured(); - var consumer = new ConsumerBuilder(_consumerConfig) - .SetPartitionsAssignedHandler(OnPartitionsAssigned) - .SetPartitionsRevokedHandler(OnPartitionsRevoked) - .Build(); - - //TODO: Value/Key deserializers for consumer + var consumer = BuildConsumer(); //TODO: expose WithProcessorConfig() method - var config = Options.Create(new ProcessorConfig + var processorConfig = Options.Create(new ProcessorConfig { Topic = _topic }); @@ -83,7 +101,26 @@ public IKafkaProcessor Build() return new KafkaProcessor( consumer, _topicPartitionQueueSelector, - config); + processorConfig); + } + + private IConsumer BuildConsumer() + { + var builder = new ConsumerBuilder(_consumerConfig) + .SetPartitionsAssignedHandler(OnPartitionsAssigned) + .SetPartitionsRevokedHandler(OnPartitionsRevoked); + + if (_keyDeserializer != null) + { + builder.SetKeyDeserializer(_keyDeserializer); + } + + if (_valueDeserializer != null) + { + builder.SetValueDeserializer(_valueDeserializer); + } + + return builder.Build(); } private IEnumerable OnPartitionsRevoked( From f1e5acd89eb815be5c7d3dcf8aec5faafb9b6be2 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 14 Sep 2020 22:15:26 +0300 Subject: [PATCH 19/22] Abort queues on partition revoked event --- .../TopicPartitionQueueSelectorShould.cs | 21 +++++++++++++++++++ .../TopicPartitionQueueSelector.cs | 3 +-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs index 2b49372..8f97f5a 100644 --- a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueSelectorShould.cs @@ -101,6 +101,27 @@ public void RemoveQueues() } } + [Fact] + public void AbortQueuesOnRemove() + { + // arrange + var tp = DataGenerator.TopicPartition; + + _topicPartitionQueueFactory + .Setup(tpqf => tpqf.Create( + It.IsAny>(), + It.IsAny())) + .Returns(_topicPartitionQueue.Object); + + _topicPartitionQueueSelector.AddQueue(tp, _messageHandler.Object); + + // act + _topicPartitionQueueSelector.Remove(new []{ tp }); + + // assert + _topicPartitionQueue.Verify(tpq => tpq.AbortAsync(), Times.Once()); + } + [Fact] public void ThrowIfTopicPartitionNotFound() { diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs index 0e9cc6b..13b2cf9 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueueSelector.cs @@ -39,8 +39,7 @@ public void AddQueue(TopicPartition topicPartition, IMessageHandler topicPartitions) { - //TODO: Abort vs Complete - Task.WaitAll(_queues.Select(q => q.Value.CompleteAsync()).ToArray()); + Task.WaitAll(_queues.Select(q => q.Value.AbortAsync()).ToArray()); foreach (var tp in topicPartitions) { From 9ac08b92d3caa18aeb43c7f01a373ca6937b8ed1 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 15 Sep 2020 22:08:18 +0300 Subject: [PATCH 20/22] Hide Abort method call in EnqueueAsync --- .../Processor/KafkaProcessorShould.cs | 2 +- src/KafkaConsumer/Processor/KafkaProcessor.cs | 7 +------ .../TopicPartitionQueue/ITopicPartitionQueue.cs | 1 + .../TopicPartitionQueue/TopicPartitionQueue.cs | 10 ++++++++++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs index 2c82b41..657ed9a 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorShould.cs @@ -58,7 +58,7 @@ public async Task EnqueueConsumedResultToTopicPartitionQueue() // assert _topicPartitionQueue.Verify( - tpq => tpq.TryEnqueueAsync(message.IsActual()), + tpq => tpq.EnqueueAsync(message.IsActual()), Times.Once()); } diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index efcbaee..6d3cc10 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -40,12 +40,7 @@ public async Task ProcessMessagesAsync(CancellationToken ct) var message = new MessageHandler.Message(_consumer, consumeResult); - var isEnqueued = await queue.TryEnqueueAsync(message); - - if (!isEnqueued) - { - await queue.AbortAsync(); - } + await queue.EnqueueAsync(message); } } } diff --git a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs index 7d21aad..d435354 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/ITopicPartitionQueue.cs @@ -5,6 +5,7 @@ namespace KafkaConsumer.TopicPartitionQueue { public interface ITopicPartitionQueue { + Task EnqueueAsync(Message message); Task TryEnqueueAsync(Message message); Task CompleteAsync(); Task AbortAsync(); diff --git a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs index 5d45b6c..8a61461 100644 --- a/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs +++ b/src/KafkaConsumer/TopicPartitionQueue/TopicPartitionQueue.cs @@ -53,5 +53,15 @@ public async Task AbortAsync() public async Task TryEnqueueAsync(Message message) => await _bufferBlock.SendAsync(message); + + public async Task EnqueueAsync(Message message) + { + var isEnqueued = await _bufferBlock.SendAsync(message); + + if (!isEnqueued) + { + await AbortAsync(); + } + } } } \ No newline at end of file From f3310b18f0c186fcaed04f4fcf4d882e00d07a08 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 15 Sep 2020 22:15:33 +0300 Subject: [PATCH 21/22] Removed whitespace, cleanup --- src/KafkaConsumer.Tests/DataGenerator.cs | 2 +- .../Extensions/ObjectExtensions.cs | 2 +- .../MessageHandlerVerifier.cs | 4 ++-- .../Processor/KafkaProcessorBuilderShould.cs | 5 ----- .../TopicPartitionQueueShould.cs | 20 +++++++------------ .../Processor/Config/ProcessorConfig.cs | 1 - 6 files changed, 11 insertions(+), 23 deletions(-) diff --git a/src/KafkaConsumer.Tests/DataGenerator.cs b/src/KafkaConsumer.Tests/DataGenerator.cs index d0a4eeb..6b83d5c 100644 --- a/src/KafkaConsumer.Tests/DataGenerator.cs +++ b/src/KafkaConsumer.Tests/DataGenerator.cs @@ -12,7 +12,7 @@ public static IEnumerable GenerateTopicPartitions(int count) => .Select(i => new TopicPartition("test-topic", i)); public static TopicPartition TopicPartition => - new TopicPartition("test-topic", 1); + new TopicPartition("test-topic", 1); public static TopicPartitionOffset TopicPartitionOffset => new TopicPartitionOffset(TopicPartition, 1); diff --git a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs index 15a65a1..07527b3 100644 --- a/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs +++ b/src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs @@ -5,7 +5,7 @@ namespace KafkaConsumer.Tests.Extensions { public static class ObjectExtensions { - public static bool IsEquivalentTo(this object actual, object expected) + public static bool IsEquivalentTo(this T actual, T expected) { actual.Should().BeEquivalentTo(expected); diff --git a/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs index 77e9e8f..38e9913 100644 --- a/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs +++ b/src/KafkaConsumer.Tests/MessageHandlerVerifier.cs @@ -7,11 +7,11 @@ namespace KafkaConsumer.Tests { - public class MessageHandlerVerifier { + private readonly List> _messages; + private int _index; - private List> _messages; public MessageHandlerVerifier(List> messages) { diff --git a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs index f23e4be..0a65e5b 100644 --- a/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs +++ b/src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs @@ -90,7 +90,6 @@ public void ThrowIfConsumerConfigAlreadySet() _kafkaProcessorBuilder.WithConfig(consumerConfig); // act - var exception = Assert.Throws( () => _kafkaProcessorBuilder.WithConfig(consumerConfig)); @@ -107,7 +106,6 @@ public void ThrowIfTopicAlreadySet() _kafkaProcessorBuilder.FromTopic(topic); // act - var exception = Assert.Throws( () => _kafkaProcessorBuilder.FromTopic(topic)); @@ -122,7 +120,6 @@ public void ThrowIfHandlerFactoryAlreadySet() _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object); // act - var exception = Assert.Throws( () => _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object)); @@ -137,7 +134,6 @@ public void ThrowIfKeyDeserializerAlreadySet() _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object); // act - var exception = Assert.Throws( () => _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object)); @@ -152,7 +148,6 @@ public void ThrowIfValueDeserializerAlreadySet() _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object); // act - var exception = Assert.Throws( () => _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object)); diff --git a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs index da2a0ff..a3bf724 100644 --- a/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs +++ b/src/KafkaConsumer.Tests/TopicPartitionQueue/TopicPartitionQueueShould.cs @@ -49,7 +49,6 @@ public async Task EnqueueAndInvokeMessageHandler(int count) await _topicPartitionQueue.CompleteAsync(); // assert - MessageHandlerVerifier.Verify(_messageHandler, messages); } @@ -64,12 +63,7 @@ public async Task PropagateErrorsFromMessageHandlerOnComplete() .Throws(); // act - bool isEnqueued; - do - { - isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); - } - while(isEnqueued); + await EnqueueWhileSuccessful(message); // assert await Assert.ThrowsAsync( @@ -91,12 +85,7 @@ public async Task PropagateErrorsFromMessageHandlerOnAbort() .Throws(); // act - bool isEnqueued; - do - { - isEnqueued = await _topicPartitionQueue.TryEnqueueAsync(message); - } - while(isEnqueued); + await EnqueueWhileSuccessful(message); // assert await Assert.ThrowsAsync( @@ -106,5 +95,10 @@ await Assert.ThrowsAsync( m => m.HandleAsync(message.IsActual()), Times.Once()); } + + private async Task EnqueueWhileSuccessful(MessageHandler.Message message) + { + while (await _topicPartitionQueue.TryEnqueueAsync(message)) { } + } } } \ No newline at end of file diff --git a/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs index 00633ac..22695ab 100644 --- a/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs +++ b/src/KafkaConsumer/Processor/Config/ProcessorConfig.cs @@ -3,6 +3,5 @@ namespace KafkaConsumer.Processor.Config public class ProcessorConfig { public string Topic { get; set; } - } } \ No newline at end of file From e9a1bfd1457b9da5c5d7550e7cf678ff7b8d1df3 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 15 Sep 2020 22:18:13 +0300 Subject: [PATCH 22/22] Added an important TODO --- src/KafkaConsumer/Processor/KafkaProcessor.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/KafkaConsumer/Processor/KafkaProcessor.cs b/src/KafkaConsumer/Processor/KafkaProcessor.cs index 6d3cc10..66050f4 100644 --- a/src/KafkaConsumer/Processor/KafkaProcessor.cs +++ b/src/KafkaConsumer/Processor/KafkaProcessor.cs @@ -47,6 +47,9 @@ public async Task ProcessMessagesAsync(CancellationToken ct) catch (OperationCanceledException) { } finally { + //TODO: Close() blocks indefinitely if an exception is thrown in PartitionsRevoked/Assigned handlers. + // https://github.com/confluentinc/confluent-kafka-dotnet/issues/1280 + _consumer.Close(); } }