Skip to content

Commit

Permalink
Merge pull request #1 from Rytisk/dev
Browse files Browse the repository at this point in the history
Backbone for kafka processor
  • Loading branch information
Rytisk authored Sep 15, 2020
2 parents 590eac2 + e9a1bfd commit 0f632a1
Show file tree
Hide file tree
Showing 25 changed files with 1,281 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
##
## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore

.vscode/

# User-specific files
*.rsuser
*.suo
Expand Down
52 changes: 52 additions & 0 deletions src/KafkaConsumer.Tests/DataGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;

namespace KafkaConsumer.Tests
{
public static class DataGenerator
{
public static IEnumerable<TopicPartition> GenerateTopicPartitions(int count) =>
Enumerable
.Range(0, count)
.Select(i => new TopicPartition("test-topic", i));

public static TopicPartition TopicPartition =>
new TopicPartition("test-topic", 1);

public static TopicPartitionOffset TopicPartitionOffset =>
new TopicPartitionOffset(TopicPartition, 1);

public static MessageHandler.Message<string, string> GenerateMessage(
IConsumer<string, string> consumer) =>
new MessageHandler.Message<string, string>(consumer, ConsumeResult);

public static IEnumerable<MessageHandler.Message<string, string>> GenerateMessages(
IConsumer<string, string> consumer,
int count) =>
GenerateConsumeResults(count)
.Select(cr => new MessageHandler.Message<string, string>(consumer, cr));

public static ConsumeResult<string, string> ConsumeResult =>
new ConsumeResult<string, string>
{
Message = new Message<string, string>
{
Key = "key",
Value = "value"
},
TopicPartitionOffset = TopicPartitionOffset
};

public static IEnumerable<ConsumeResult<string, string>> GenerateConsumeResults(int count) =>
Enumerable.Range(0, count).Select(i => new ConsumeResult<string, string>
{
Message = new Message<string, string>
{
Key = $"key_{i}",
Value = $"value_{i}"
},
TopicPartitionOffset = new TopicPartitionOffset("test-topic", 0, i)
});
}
}
20 changes: 20 additions & 0 deletions src/KafkaConsumer.Tests/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using FluentAssertions;
using Moq;

namespace KafkaConsumer.Tests.Extensions
{
public static class ObjectExtensions
{
public static bool IsEquivalentTo<T>(this T actual, T expected)
{
actual.Should().BeEquivalentTo(expected);

return true;
}

public static T IsActual<T>(this T expected)
{
return It.Is<T>(actual => actual.IsEquivalentTo(expected));
}
}
}
22 changes: 22 additions & 0 deletions src/KafkaConsumer.Tests/KafkaConsumer.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.8" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="Moq" Version="4.14.5" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\KafkaConsumer\KafkaConsumer.csproj" />
</ItemGroup>

</Project>
54 changes: 54 additions & 0 deletions src/KafkaConsumer.Tests/MessageHandlerVerifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using KafkaConsumer.MessageHandler;
using Moq;
using Xunit;

namespace KafkaConsumer.Tests
{
public class MessageHandlerVerifier
{
private readonly List<Message<string, string>> _messages;

private int _index;

public MessageHandlerVerifier(List<Message<string, string>> messages)
{
_messages = messages;
}

public static void Verify(
Mock<IMessageHandler<string, string>> messageHandler,
List<Message<string, string>> messages)
{
if(!messages.Any())
{
messageHandler.Verify(
mh => mh.HandleAsync(It.IsAny<Message<string, string>>()),
Times.Never());

return;
}

var collectionVerifier = new MessageHandlerVerifier(messages);

messageHandler.Verify(mh => mh.HandleAsync(
It.Is<Message<string, string>>(msg => collectionVerifier.Validate(msg))));

messageHandler.VerifyNoOtherCalls();

Assert.Equal(messages.Count, collectionVerifier._index);
}


private bool Validate(Message<string, string> message)
{
_messages[_index].Should().BeEquivalentTo(message);

_index++;

return true;
}
}
}
158 changes: 158 additions & 0 deletions src/KafkaConsumer.Tests/Processor/KafkaProcessorBuilderShould.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
using Confluent.Kafka;
using KafkaConsumer.MessageHandler;
using KafkaConsumer.Processor;
using KafkaConsumer.TopicPartitionQueue;
using Moq;
using System;
using Xunit;

namespace KafkaConsumer.Tests.Processor
{
public class KafkaProcessorBuilderShould
{
private readonly Mock<ITopicPartitionQueueSelector<string, string>> _topicPartitionQueueSelector;
private readonly Mock<IMessageHandler<string, string>> _messageHandler;
private readonly KafkaProcessorBuilder<string, string> _kafkaProcessorBuilder;
private readonly Mock<IDeserializer<string>> _keyDeserializer;
private readonly Mock<IDeserializer<string>> _valueDeserializer;

public KafkaProcessorBuilderShould()
{
_keyDeserializer = new Mock<IDeserializer<string>>();
_valueDeserializer = new Mock<IDeserializer<string>>();
_messageHandler = new Mock<IMessageHandler<string, string>>();
_topicPartitionQueueSelector = new Mock<ITopicPartitionQueueSelector<string, string>>();

_kafkaProcessorBuilder = new KafkaProcessorBuilder<string, string>(_topicPartitionQueueSelector.Object);
}

[Fact]
public void ThrowIfConsumerConfigNotSet()
{
// arrange
var topic = "topic";

_kafkaProcessorBuilder
.FromTopic(topic)
.WithHandlerFactory(_ => _messageHandler.Object);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.Build());

// assert
Assert.Equal("'consumerConfig' must be set!", exception.Message);
}

[Fact]
public void ThrowIfTopicNotSet()
{
// arrange
var consumerConfig = new ConsumerConfig();

_kafkaProcessorBuilder
.WithConfig(consumerConfig)
.WithHandlerFactory(_ => _messageHandler.Object);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.Build());

// assert
Assert.Equal("'topic' must be set!", exception.Message);
}

[Fact]
public void ThrowIfHandlerFactoryNotSet()
{
// arrange
var consumerConfig = new ConsumerConfig();
var topic = "topic";

_kafkaProcessorBuilder
.WithConfig(consumerConfig)
.FromTopic(topic);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.Build());

// assert
Assert.Equal("'handlerFactory' must be set!", exception.Message);
}

[Fact]
public void ThrowIfConsumerConfigAlreadySet()
{
// arrange
var consumerConfig = new ConsumerConfig();

_kafkaProcessorBuilder.WithConfig(consumerConfig);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.WithConfig(consumerConfig));

// assert
Assert.Equal("'consumerConfig' was already set!", exception.Message);
}

[Fact]
public void ThrowIfTopicAlreadySet()
{
// arrange
var topic = "topic";

_kafkaProcessorBuilder.FromTopic(topic);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.FromTopic(topic));

// assert
Assert.Equal("'topic' was already set!", exception.Message);
}

[Fact]
public void ThrowIfHandlerFactoryAlreadySet()
{
// arrange
_kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.WithHandlerFactory(_ => _messageHandler.Object));

// assert
Assert.Equal("'handlerFactory' was already set!", exception.Message);
}

[Fact]
public void ThrowIfKeyDeserializerAlreadySet()
{
// arrange
_kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.WithKeyDeserializer(_keyDeserializer.Object));

// assert
Assert.Equal("'keyDeserializer' was already set!", exception.Message);
}

[Fact]
public void ThrowIfValueDeserializerAlreadySet()
{
// arrange
_kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object);

// act
var exception = Assert.Throws<InvalidOperationException>(
() => _kafkaProcessorBuilder.WithValueDeserializer(_valueDeserializer.Object));

// assert
Assert.Equal("'valueDeserializer' was already set!", exception.Message);
}
}
}
Loading

0 comments on commit 0f632a1

Please sign in to comment.