diff --git a/.editorconfig b/.editorconfig index b517785..7e31598 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,5 +1,6 @@ root = true +# All files [*] trim_trailing_whitespace = true insert_final_newline = true @@ -8,12 +9,126 @@ indent_size = 4 indent_style = space end_of_line = lf +# Xml files +[*.xml] +indent_size = 2 + [*.cs] +#### Naming styles #### + +# Naming rules + +# Symbol specifications + +dotnet_naming_symbols.private_or_internal_field.applicable_kinds = field +dotnet_naming_symbols.private_or_internal_field.applicable_accessibilities = internal, private, private_protected +dotnet_naming_symbols.private_or_internal_field.required_modifiers = + +# Naming styles + +dotnet_naming_style.lowercase__begins_with__.required_prefix = _ +dotnet_naming_style.lowercase__begins_with__.required_suffix = +dotnet_naming_style.lowercase__begins_with__.word_separator = +dotnet_naming_style.lowercase__begins_with__.capitalization = all_lower +csharp_indent_labels = one_less_than_current +csharp_space_around_binary_operators = before_and_after +csharp_using_directive_placement = outside_namespace:silent +csharp_prefer_simple_using_statement = true:suggestion +csharp_prefer_braces = true:silent +csharp_style_namespace_declarations = file_scoped:silent +csharp_style_prefer_method_group_conversion = true:silent +csharp_style_prefer_top_level_statements = true:silent +csharp_style_prefer_primary_constructors = true:suggestion +csharp_style_expression_bodied_methods = true:silent +csharp_style_expression_bodied_constructors = true:silent +csharp_style_expression_bodied_operators = true:silent +csharp_style_expression_bodied_properties = true:silent +csharp_style_expression_bodied_indexers = true:silent +csharp_style_expression_bodied_accessors = true:silent +csharp_style_expression_bodied_lambdas = true:silent +csharp_style_expression_bodied_local_functions = true:silent +csharp_style_throw_expression = true:suggestion +csharp_style_prefer_null_check_over_type_check = true:suggestion +csharp_prefer_simple_default_expression = true:suggestion +csharp_style_prefer_local_over_anonymous_function = true:suggestion +csharp_style_prefer_index_operator = true:suggestion +csharp_style_prefer_range_operator = true:suggestion +csharp_style_implicit_object_creation_when_type_is_apparent = true:suggestion +csharp_style_prefer_tuple_swap = true:suggestion +csharp_style_prefer_utf8_string_literals = true:suggestion +csharp_style_inlined_variable_declaration = true:suggestion +csharp_style_deconstructed_variable_declaration = true:suggestion +csharp_style_unused_value_assignment_preference = discard_variable:suggestion +csharp_style_unused_value_expression_statement_preference = discard_variable:silent + resharper_csharp_place_expr_method_on_single_line = never -# Terraform files -[*.tf] -indent_size = 2 +[*.{cs,vb}] +#### Naming styles #### + +# Naming rules + +dotnet_naming_rule.interface_should_be_begins_with_i.severity = suggestion +dotnet_naming_rule.interface_should_be_begins_with_i.symbols = interface +dotnet_naming_rule.interface_should_be_begins_with_i.style = begins_with_i + +dotnet_naming_rule.types_should_be_pascal_case.severity = suggestion +dotnet_naming_rule.types_should_be_pascal_case.symbols = types +dotnet_naming_rule.types_should_be_pascal_case.style = pascal_case + +dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = suggestion +dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members +dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case + +# Symbol specifications + +dotnet_naming_symbols.interface.applicable_kinds = interface +dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.interface.required_modifiers = + +dotnet_naming_symbols.types.applicable_kinds = class, struct, interface, enum +dotnet_naming_symbols.types.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.types.required_modifiers = + +dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method +dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.non_field_members.required_modifiers = + +# Naming styles + +dotnet_naming_style.begins_with_i.required_prefix = I +dotnet_naming_style.begins_with_i.required_suffix = +dotnet_naming_style.begins_with_i.word_separator = +dotnet_naming_style.begins_with_i.capitalization = pascal_case + +dotnet_naming_style.pascal_case.required_prefix = +dotnet_naming_style.pascal_case.required_suffix = +dotnet_naming_style.pascal_case.word_separator = +dotnet_naming_style.pascal_case.capitalization = pascal_case + +dotnet_naming_style.pascal_case.required_prefix = +dotnet_naming_style.pascal_case.required_suffix = +dotnet_naming_style.pascal_case.word_separator = +dotnet_naming_style.pascal_case.capitalization = pascal_case +dotnet_style_operator_placement_when_wrapping = beginning_of_line +tab_width = 4 +indent_size = 4 +dotnet_style_coalesce_expression = true:suggestion +dotnet_style_null_propagation = true:suggestion +dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion +dotnet_style_prefer_auto_properties = true:silent +dotnet_style_object_initializer = true:suggestion +dotnet_style_collection_initializer = true:suggestion +dotnet_style_prefer_simplified_boolean_expressions = true:suggestion +dotnet_style_prefer_conditional_expression_over_assignment = true:silent +dotnet_style_prefer_conditional_expression_over_return = true:silent +dotnet_style_explicit_tuple_names = true:suggestion +dotnet_style_prefer_inferred_tuple_names = true:suggestion +dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion +dotnet_style_prefer_compound_assignment = true:suggestion +dotnet_style_prefer_simplified_interpolation = true:suggestion +dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion +dotnet_style_namespace_match_folder = true:suggestion [*.json] indent_size = 2 @@ -21,6 +136,5 @@ indent_size = 2 [*.{yaml,yml}] indent_size = 2 - [*.csproj] -indent_size = 2 +indent_size = 2 \ No newline at end of file diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj index 30f3939..f967ff9 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj @@ -13,10 +13,6 @@ - - - - diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs index 987a5af..7b76f15 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs @@ -1,25 +1,20 @@ -using System.Text.Json; -using Confluent.Kafka; +using Confluent.Kafka; using Dapper; -using Microsoft.Extensions.Options; using Npgsql; +using System.Text.Json; namespace KafkaFlow.Outbox.Postgres; -public class PostgresOutboxBackend : IOutboxBackend +public class PostgresOutboxBackend(NpgsqlDataSource connectionPool) : IOutboxBackend { - private readonly NpgsqlDataSource _connectionPool; - - public PostgresOutboxBackend(NpgsqlDataSource connectionPool) - { - _connectionPool = connectionPool; - } + private readonly NpgsqlDataSource _connectionPool = connectionPool; public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) { - var sql = - @"INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body) - VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body)"; + var sql = """ + INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body) + VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body) + """; await using var conn = _connectionPool.CreateConnection(); @@ -41,27 +36,27 @@ public async ValueTask Store(TopicPartition topicPartition, Message Read(int batchSize, CancellationToken token = default) { - var sql = @" -DELETE FROM outbox.outbox -WHERE - sequence_id = ANY(ARRAY( - SELECT sequence_id FROM outbox.outbox - ORDER BY sequence_id - LIMIT @batch_size - FOR UPDATE - )) -RETURNING - sequence_id, - topic_name, - partition, - message_key, - message_headers, - message_body -"; + var sql = """ + DELETE FROM outbox.outbox + WHERE + sequence_id = ANY(ARRAY( + SELECT sequence_id FROM outbox.outbox + ORDER BY sequence_id + LIMIT @batch_size + FOR UPDATE + )) + RETURNING + sequence_id, + topic_name, + partition, + message_key, + message_headers, + message_body + """; await using var conn = _connectionPool.CreateConnection(); var result = await conn.QueryAsync(sql, new { batch_size = batchSize }); - return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty(); + return result?.Select(ToOutboxRecord).ToArray() ?? []; } private static OutboxRecord ToOutboxRecord(OutboxTableRow row) @@ -94,10 +89,12 @@ private static OutboxRecord ToOutboxRecord(OutboxTableRow row) internal sealed class OutboxTableRow { +#pragma warning disable IDE1006 // Naming Styles public long sequence_id { get; set; } public string topic_name { get; set; } = null!; public int? partition { get; set; } public byte[]? message_key { get; set; } public string? message_headers { get; set; } public byte[]? message_body { get; set; } +#pragma warning restore IDE1006 // Naming Styles } diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj index c3972e9..90e79f0 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj @@ -16,10 +16,6 @@ - - - - diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs index 86e7e06..76126ec 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs @@ -7,12 +7,9 @@ namespace KafkaFlow.Outbox.SqlServer; -public class SqlServerOutboxBackend : IOutboxBackend +public class SqlServerOutboxBackend(IOptions options) : IOutboxBackend { - private readonly SqlServerBackendOptions _options; - - public SqlServerOutboxBackend(IOptions options) - => _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) { @@ -53,7 +50,7 @@ ORDER BY [sequence_id] using var conn = new SqlConnection(_options.ConnectionString); var result = await conn.QueryAsync(sql, new { batch_size = batchSize }); - return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty(); + return result?.Select(ToOutboxRecord).ToArray() ?? []; } private static OutboxRecord ToOutboxRecord(OutboxTableRow row) @@ -86,10 +83,12 @@ private static OutboxRecord ToOutboxRecord(OutboxTableRow row) internal sealed class OutboxTableRow { +#pragma warning disable IDE1006 // Naming Styles public long sequence_id { get; set; } public string topic_name { get; set; } = null!; public int? partition { get; set; } public byte[]? message_key { get; set; } public string? message_headers { get; set; } public byte[]? message_body { get; set; } +#pragma warning restore IDE1006 // Naming Styles } diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs index cf27cab..23208bf 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs @@ -1,24 +1,17 @@ -using System.Transactions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using System.Transactions; namespace KafkaFlow.Outbox; -internal sealed class OutboxDispatcherService : BackgroundService +internal sealed class OutboxDispatcherService( + ILogger logger, + IMessageProducer producer, + IOutboxBackend outboxBackend) : BackgroundService { - private readonly ILogger _logger; - private readonly IMessageProducer _producer; - private readonly IOutboxBackend _outboxBackend; - - public OutboxDispatcherService( - ILogger logger, - IMessageProducer producer, - IOutboxBackend outboxBackend) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _producer = producer ?? throw new ArgumentNullException(nameof(producer)); - _outboxBackend = outboxBackend ?? throw new ArgumentNullException(nameof(outboxBackend)); - } + private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + private readonly IMessageProducer _producer = producer ?? throw new ArgumentNullException(nameof(producer)); + private readonly IOutboxBackend _outboxBackend = outboxBackend ?? throw new ArgumentNullException(nameof(outboxBackend)); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -48,13 +41,13 @@ private async Task DispatchNextBatchAsync(CancellationToken stoppingToken) } scope.Complete(); - return batch.Any(); + return batch.Length != 0; } private static TransactionScope BeginTransaction => new( scopeOption: TransactionScopeOption.RequiresNew, transactionOptions: new TransactionOptions - { IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) }, + { IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) }, asyncFlowOption: TransactionScopeAsyncFlowOption.Enabled); } diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxProducerDecorator.cs b/src/Contrib.KafkaFlow.Outbox/OutboxProducerDecorator.cs index 9c3dc5e..9b36563 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxProducerDecorator.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxProducerDecorator.cs @@ -2,16 +2,10 @@ namespace KafkaFlow.Outbox; -internal sealed class OutboxProducerDecorator : IProducer +internal sealed class OutboxProducerDecorator(IProducer inner, IOutboxBackend outbox) : IProducer { - private readonly IProducer _inner; - private readonly IOutboxBackend _outbox; - - public OutboxProducerDecorator(IProducer inner, IOutboxBackend outbox) - { - _inner = inner ?? throw new ArgumentNullException(nameof(inner)); - _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox)); - } + private readonly IProducer _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + private readonly IOutboxBackend _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox)); public void Dispose() => _inner.Dispose(); @@ -43,16 +37,10 @@ public async Task> ProduceAsync( } public void Produce(string topic, Message message, - Action>? deliveryHandler = null) - { - ProduceAsync(topic, message).ConfigureAwait(false).GetAwaiter().GetResult(); - } + Action>? deliveryHandler = null) => ProduceAsync(topic, message).ConfigureAwait(false).GetAwaiter().GetResult(); public void Produce(TopicPartition topicPartition, Message message, - Action>? deliveryHandler = null) - { - ProduceAsync(topicPartition, message).ConfigureAwait(false).GetAwaiter().GetResult(); - } + Action>? deliveryHandler = null) => ProduceAsync(topicPartition, message).ConfigureAwait(false).GetAwaiter().GetResult(); public void SetSaslCredentials(string username, string password) { @@ -65,44 +53,16 @@ public void SetSaslCredentials(string username, string password) public void Flush(CancellationToken cancellationToken = default) => _inner.Flush(cancellationToken); - public void InitTransactions(TimeSpan timeout) - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void InitTransactions(TimeSpan timeout) => throw new InvalidOperationException("This producer does not support transactions"); - public void BeginTransaction() - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void BeginTransaction() => throw new InvalidOperationException("This producer does not support transactions"); - public void CommitTransaction(TimeSpan timeout) - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void CommitTransaction(TimeSpan timeout) => throw new InvalidOperationException("This producer does not support transactions"); - public void CommitTransaction() - { - throw new InvalidOperationException("This producer does not support transactions"); - } - - public void AbortTransaction(TimeSpan timeout) - { - throw new InvalidOperationException("This producer does not support transactions"); - } - - public void AbortTransaction() - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void CommitTransaction() => throw new InvalidOperationException("This producer does not support transactions"); - public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void AbortTransaction(TimeSpan timeout) => throw new InvalidOperationException("This producer does not support transactions"); - public void SendOffsetsToTransaction(IEnumerable offsets, - IConsumerGroupMetadata groupMetadata, TimeSpan timeout) - { - throw new InvalidOperationException("This producer does not support transactions"); - } + public void AbortTransaction() => throw new InvalidOperationException("This producer does not support transactions"); + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) => throw new InvalidOperationException("This producer does not support transactions"); } diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs b/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs index 40a6a80..037b603 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs @@ -61,16 +61,11 @@ public interface IOutboxProducerConfigurationBuilder IOutboxProducerConfigurationBuilder WithCompression(CompressionType compressionType, int? compressionLevel); } -internal sealed class OutboxProducerConfigurationBuilder : IOutboxProducerConfigurationBuilder +internal sealed class OutboxProducerConfigurationBuilder(IProducerConfigurationBuilder builder) : IOutboxProducerConfigurationBuilder { - private readonly IProducerConfigurationBuilder _builder; + private readonly IProducerConfigurationBuilder _builder = builder ?? throw new ArgumentNullException(nameof(builder)); private readonly ProducerConfig _producerConfig = new(); - public OutboxProducerConfigurationBuilder(IProducerConfigurationBuilder builder) - { - _builder = builder ?? throw new ArgumentNullException(nameof(builder)); - } - public IDependencyConfigurator DependencyConfigurator => _builder.DependencyConfigurator; public IOutboxProducerConfigurationBuilder WithPartitioner(Partitioner partitioner) @@ -96,7 +91,7 @@ public IOutboxProducerConfigurationBuilder WithStatisticsHandler(Action public IOutboxProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs) => WithBuilder(x => x.WithStatisticsIntervalMs(statisticsIntervalMs)); - private IOutboxProducerConfigurationBuilder WithBuilder(Action action) + private OutboxProducerConfigurationBuilder WithBuilder(Action action) { action(_builder); return this; diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj index 6f33620..443739b 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj @@ -13,10 +13,6 @@ - - - - diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs index fd0ea45..e4ab025 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs @@ -1,30 +1,25 @@ -using System.Text.Json; -using Dapper; +using Dapper; using Npgsql; +using System.Text.Json; namespace KafkaFlow.ProcessManagers.Postgres; -public sealed class PostgresProcessManagersStore : IProcessStateStore +public sealed class PostgresProcessManagersStore(NpgsqlDataSource connectionPool) : IProcessStateStore { - private readonly NpgsqlDataSource _connectionPool; - - public PostgresProcessManagersStore(NpgsqlDataSource connectionPool) - { - _connectionPool = connectionPool; - } + private readonly NpgsqlDataSource _connectionPool = connectionPool; public async ValueTask Persist(Type processType, Guid processId, VersionedState state) { - var sql = @" -INSERT INTO process_managers.processes(process_type, process_id, process_state) -VALUES (@process_type, @process_id, @process_state) -ON CONFLICT (process_type, process_id) DO -UPDATE -SET - process_state = EXCLUDED.process_state, - date_updated_utc = (now() AT TIME ZONE 'utc') -WHERE xmin = @version -"; + var sql = """ + INSERT INTO process_managers.processes(process_type, process_id, process_state) + VALUES (@process_type, @process_id, @process_state) + ON CONFLICT (process_type, process_id) DO + UPDATE + SET + process_state = EXCLUDED.process_state, + date_updated_utc = (now() AT TIME ZONE 'utc') + WHERE xmin = @version + """; await using var conn = _connectionPool.CreateConnection(); var result = await conn.ExecuteAsync(sql, new { @@ -35,16 +30,19 @@ ON CONFLICT (process_type, process_id) DO }); if (result == 0) + { throw new OptimisticConcurrencyException(processType, processId, $"Concurrency error when persisting state {processType.FullName}"); + } } public async ValueTask Load(Type processType, Guid processId) { - var sql = @" -SELECT process_state, xmin as version -FROM process_managers.processes -WHERE process_type = @process_type AND process_id = @process_id"; + var sql = """ + SELECT process_state, xmin as version + FROM process_managers.processes + WHERE process_type = @process_type AND process_id = @process_id + """; await using var conn = _connectionPool.CreateConnection(); var result = await conn.QueryAsync(sql, new @@ -55,7 +53,10 @@ FROM process_managers.processes var firstResult = result?.FirstOrDefault(); - if (firstResult == null) return VersionedState.Zero; + if (firstResult == null) + { + return VersionedState.Zero; + } var decoded = JsonSerializer.Deserialize(firstResult.process_state, processType); return new VersionedState(firstResult.version, decoded); @@ -63,26 +64,31 @@ FROM process_managers.processes public async ValueTask Delete(Type processType, Guid processId, int version) { - var sql = @" -DELETE FROM process_managers.processes -WHERE process_type = @process_type AND process_id = @process_id and xmin = @version"; + var sql = """ + DELETE FROM process_managers.processes + WHERE process_type = @process_type AND process_id = @process_id and xmin = @version + """; await using var conn = _connectionPool.CreateConnection(); var result = await conn.ExecuteAsync(sql, new { process_type = processType.FullName, process_id = processId, - version = version + version }); if (result == 0) + { throw new OptimisticConcurrencyException(processType, processId, $"Concurrency error when persisting state {processType.FullName}"); + } } private sealed class ProcessStateRow { +#pragma warning disable IDE1006 // Naming Styles public string process_state { get; set; } = null!; public int version { get; set; } +#pragma warning restore IDE1006 // Naming Styles } } diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj index bf370c7..ce5ca51 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj @@ -16,10 +16,6 @@ - - - - diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs index ccd29cf..b7519e6 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs @@ -6,12 +6,9 @@ namespace KafkaFlow.ProcessManagers.SqlServer; -public sealed class SqlServerProcessManagersStore : IProcessStateStore +public sealed class SqlServerProcessManagersStore(IOptions options) : IProcessStateStore { - private readonly SqlServerBackendOptions _options; - - public SqlServerProcessManagersStore(IOptions options) - => _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); public async ValueTask Persist(Type processType, Guid processId, VersionedState state) { @@ -95,7 +92,9 @@ DELETE FROM [process_managers].[processes] private sealed class ProcessStateRow { +#pragma warning disable IDE1006 // Naming Styles public required string process_state { get; set; } public required int version { get; set; } +#pragma warning restore IDE1006 // Naming Styles } } diff --git a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj index f7eb111..bd00a0c 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj @@ -9,7 +9,6 @@ - diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs index 9d3fb5c..f09880a 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs @@ -4,25 +4,20 @@ namespace KafkaFlow.ProcessManagers; -public sealed class ProcessManagerConfigurationBuilder +public sealed class ProcessManagerConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { - private readonly IDependencyConfigurator _dependencyConfigurator; + private readonly IDependencyConfigurator _dependencyConfigurator = dependencyConfigurator ?? throw new ArgumentNullException(nameof(dependencyConfigurator)); private InstanceLifetime _serviceLifetime = InstanceLifetime.Transient; - private readonly List _processManagers = new(); + private readonly List _processManagers = []; private TransactionMode _transactionMode = TransactionMode.ForEachHandler; private Func _beginTransaction = () => new TransactionScope( scopeOption: TransactionScopeOption.Required, transactionOptions: new TransactionOptions - { IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) }, + { IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) }, asyncFlowOption: TransactionScopeAsyncFlowOption.Enabled); - public ProcessManagerConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) - { - _dependencyConfigurator = dependencyConfigurator ?? throw new ArgumentNullException(nameof(dependencyConfigurator)); - } - /// /// Specify how transactions should behave for process managers /// @@ -30,7 +25,9 @@ public ProcessManagerConfigurationBuilder(IDependencyConfigurator dependencyConf public ProcessManagerConfigurationBuilder WithTransactionMode(TransactionMode transactionMode) { if (!Enum.IsDefined(typeof(TransactionMode), transactionMode)) + { throw new InvalidEnumArgumentException(nameof(transactionMode), (int)transactionMode, typeof(TransactionMode)); + } _transactionMode = transactionMode; return this; @@ -41,8 +38,7 @@ public ProcessManagerConfigurationBuilder WithTransactionMode(TransactionMode tr /// public ProcessManagerConfigurationBuilder WithBeginTransaction(Func beginTransaction) { - if (beginTransaction == null) throw new ArgumentNullException(nameof(beginTransaction)); - _beginTransaction = beginTransaction; + _beginTransaction = beginTransaction ?? throw new ArgumentNullException(nameof(beginTransaction)); return this; } @@ -75,7 +71,9 @@ public ProcessManagerConfigurationBuilder AddProcessManager() where T : IProc public ProcessManagerConfigurationBuilder AddProcessManager(Type type) { if (!typeof(IProcessManager).IsAssignableFrom(type)) + { throw new InvalidOperationException($"Type {type.FullName} is not a ProcessManager"); + } _processManagers.Add(type); return this; @@ -114,12 +112,14 @@ from messageType in GetMessageTypes(processType) var mapping = new HandlerTypeMapping(maps.AsReadOnly()); foreach (var processType in _processManagers) + { _dependencyConfigurator.Add(processType, processType, _serviceLifetime); + } return new ProcessManagerConfiguration(_transactionMode, mapping, _beginTransaction); } - private List GetMessageTypes(Type processType) => + private static List GetMessageTypes(Type processType) => processType .GetInterfaces() .Where(x => x.IsGenericType && typeof(IProcessMessage).IsAssignableFrom(x)) diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs index 84b3ca2..ba972ae 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs @@ -3,21 +3,14 @@ namespace KafkaFlow.ProcessManagers; -internal sealed class ProcessManagerMiddleware : IMessageMiddleware +internal sealed class ProcessManagerMiddleware( + IDependencyResolver dependencyResolver, + IProcessStateStore stateStore, + ProcessManagerConfiguration configuration) : IMessageMiddleware { - private readonly IDependencyResolver _dependencyResolver; - private readonly IProcessStateStore _stateStore; - private readonly ProcessManagerConfiguration _configuration; - - public ProcessManagerMiddleware( - IDependencyResolver dependencyResolver, - IProcessStateStore stateStore, - ProcessManagerConfiguration configuration) - { - _dependencyResolver = dependencyResolver ?? throw new ArgumentNullException(nameof(dependencyResolver)); - _stateStore = stateStore ?? throw new ArgumentNullException(nameof(stateStore)); - _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); - } + private readonly IDependencyResolver _dependencyResolver = dependencyResolver ?? throw new ArgumentNullException(nameof(dependencyResolver)); + private readonly IProcessStateStore _stateStore = stateStore ?? throw new ArgumentNullException(nameof(stateStore)); + private readonly ProcessManagerConfiguration _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { diff --git a/src/Contrib.KafkaFlow.ProcessManagers/VersionedState.cs b/src/Contrib.KafkaFlow.ProcessManagers/VersionedState.cs index 624e5b7..7efd41c 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/VersionedState.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/VersionedState.cs @@ -2,5 +2,5 @@ namespace KafkaFlow.ProcessManagers; public readonly record struct VersionedState(int Version, object? State) { - public static VersionedState Zero = new VersionedState(0, null); + public static readonly VersionedState Zero = new(0, null); }; diff --git a/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj index b5f4601..54e5625 100644 --- a/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj @@ -10,7 +10,6 @@ - diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/LoggingProcessStateStore.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/LoggingProcessStateStore.cs index 4e823e7..3ec534d 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/LoggingProcessStateStore.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/LoggingProcessStateStore.cs @@ -1,18 +1,13 @@ namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; -public sealed class LoggingProcessStateStore : IProcessStateStore +public sealed class LoggingProcessStateStore(IProcessStateStore innerStore) : IProcessStateStore { public enum ActionType { Persisted, Deleted } - private readonly IProcessStateStore _innerStore; - private readonly List<(ActionType, Type, Guid, VersionedState?)> _log = new(); - - public LoggingProcessStateStore(IProcessStateStore innerStore) - { - _innerStore = innerStore ?? throw new ArgumentNullException(nameof(innerStore)); - } + private readonly IProcessStateStore _innerStore = innerStore ?? throw new ArgumentNullException(nameof(innerStore)); + private readonly List<(ActionType, Type, Guid, VersionedState?)> _log = []; public IReadOnlyList<(ActionType, Type, Guid, VersionedState?)> Changes => _log.AsReadOnly(); @@ -24,10 +19,7 @@ public ValueTask Persist(Type processType, Guid processId, VersionedState state) return _innerStore.Persist(processType, processId, state); } - public ValueTask Load(Type processType, Guid processId) - { - return _innerStore.Load(processType, processId); - } + public ValueTask Load(Type processType, Guid processId) => _innerStore.Load(processType, processId); public async ValueTask Delete(Type processType, Guid processId, int version) { diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs index 5242304..0882a09 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs @@ -52,7 +52,7 @@ public PostgresKafkaFlowFixture() .UseMicrosoftLog() .AddCluster(cluster => cluster - .WithBrokers(new[] { "localhost:9092 " }) + .WithBrokers(["localhost:9092 "]) .CreateTopicIfNotExists(TopicName, 3, 1) .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) .AddProducer(producer => @@ -100,27 +100,34 @@ public PostgresKafkaFlowFixture() public void Dispose() { - if (!_disposedAsync) - { - DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult(); - } + Dispose(disposing: true); + GC.SuppressFinalize(this); } - private bool _disposedAsync = false; - public async ValueTask DisposeAsync() { - _disposedAsync = true; + await DisposeAsyncCore().ConfigureAwait(false); - _fixtureCancellation.Cancel(); - _fixtureCancellation.Dispose(); - await _kafkaBus.StopAsync(); + Dispose(disposing: false); + GC.SuppressFinalize(this); + } + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _fixtureCancellation.Cancel(); + _fixtureCancellation.Dispose(); + } + } + + protected virtual async ValueTask DisposeAsyncCore() + { + await _kafkaBus.StopAsync(); foreach (var cons in ServiceProvider.GetRequiredService().All) { await cons.StopAsync(); } - await ServiceProvider.DisposeAsync(); } } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/ServiceCollectionExtensions.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/ServiceCollectionExtensions.cs index ad0bcc8..29e9aab 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/ServiceCollectionExtensions.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/ServiceCollectionExtensions.cs @@ -26,17 +26,11 @@ public static IServiceCollection Decorate(this IServiceCol /// If the argument is null. public static IServiceCollection Decorate(this IServiceCollection services, Func decorate) { - if (decorate == null) - { - throw new ArgumentNullException(nameof(decorate)); - } + ArgumentNullException.ThrowIfNull(decorate); - if (decorate == null) - { - throw new ArgumentNullException(nameof(decorate)); - } - - return services == null + return decorate == null + ? throw new ArgumentNullException(nameof(decorate)) + : services == null ? throw new ArgumentNullException(nameof(services)) : services.DecorateDescriptors(typeof(TService), x => x.Decorate((provider, inner) => decorate(provider, (TService)inner)!)); } @@ -78,7 +72,7 @@ private static bool TryDecorateDescriptors(this IServiceCollection services, Typ } private static bool TryGetDescriptors(this IServiceCollection services, Type serviceType, out ICollection descriptors) => - (descriptors = services.Where(service => service.ServiceType == serviceType).ToArray()).Any(); + (descriptors = services.Where(service => service.ServiceType == serviceType).ToArray()).Count != 0; private static ServiceDescriptor Decorate(this ServiceDescriptor descriptor, Type decoratorType) => descriptor.WithFactory(provider => provider.CreateInstance(decoratorType, provider.GetInstance(descriptor))); @@ -94,21 +88,11 @@ private static ServiceDescriptor WithFactory(this ServiceDescriptor descriptor, ServiceDescriptor.Describe(descriptor.ServiceType, factory, descriptor.Lifetime); private static object GetInstance(this IServiceProvider provider, ServiceDescriptor descriptor) - { - if (descriptor.ImplementationInstance != null) - { - return descriptor.ImplementationInstance; - } - - if (descriptor.ImplementationType != null) - { - return provider.GetServiceOrCreateInstance(descriptor.ImplementationType); - } - - return descriptor.ImplementationFactory == null + => descriptor.ImplementationInstance ?? (descriptor.ImplementationType != null + ? provider.GetServiceOrCreateInstance(descriptor.ImplementationType) + : descriptor.ImplementationFactory == null ? throw new InvalidOperationException($"ImplementationFactory for '{descriptor.ServiceType.FullName}' is not set") - : descriptor.ImplementationFactory(provider); - } + : descriptor.ImplementationFactory(provider)); private static object GetServiceOrCreateInstance(this IServiceProvider provider, Type type) => ActivatorUtilities.GetServiceOrCreateInstance(provider, type); diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs index a841357..6f69b6e 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs @@ -39,7 +39,7 @@ public SqlServerKafkaFlowFixture() services .AddSingleton(config) .AddLogging(log => log.AddConsole().AddDebug()) - .AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")) + .AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")!) .Decorate() .AddSqlServerOutboxBackend() .AddKafka(kafka => @@ -47,7 +47,7 @@ public SqlServerKafkaFlowFixture() .UseMicrosoftLog() .AddCluster(cluster => cluster - .WithBrokers(new[] { "localhost:9092 " }) + .WithBrokers(["localhost:9092 "]) .CreateTopicIfNotExists(TopicName, 3, 1) .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) .AddProducer(producer => @@ -95,27 +95,34 @@ public SqlServerKafkaFlowFixture() public void Dispose() { - if (!_disposedAsync) - { - DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult(); - } + Dispose(disposing: true); + GC.SuppressFinalize(this); } - private bool _disposedAsync = false; - public async ValueTask DisposeAsync() { - _disposedAsync = true; + await DisposeAsyncCore().ConfigureAwait(false); - _fixtureCancellation.Cancel(); - _fixtureCancellation.Dispose(); - await _kafkaBus.StopAsync(); + Dispose(disposing: false); + GC.SuppressFinalize(this); + } + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _fixtureCancellation.Cancel(); + _fixtureCancellation.Dispose(); + } + } + + protected virtual async ValueTask DisposeAsyncCore() + { + await _kafkaBus.StopAsync(); foreach (var cons in ServiceProvider.GetRequiredService().All) { await cons.StopAsync(); } - await ServiceProvider.DisposeAsync(); } } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj index 4a429eb..4ac070c 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj @@ -1,53 +1,50 @@  - - net8.0 - enable - enable + + net8.0 + enable + enable - false - true - + false + true + - - - - - - - - - - - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - + + + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + - - - - - - - - - + + + + + + + + + - - - - Always - - + + + + Always + + diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs index 6dbe813..5be3b76 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs @@ -20,7 +20,7 @@ public async Task Should_write_update_and_delete_state() .Build(); var services = new ServiceCollection(); - services.AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")); + services.AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")!); var sp = services.BuildServiceProvider(); var store = sp.GetRequiredService(); diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifeCycleProcess.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifeCycleProcess.cs index 72a7057..3bcf413 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifeCycleProcess.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifeCycleProcess.cs @@ -1,5 +1,5 @@ -using System.Collections.Immutable; using Microsoft.Extensions.Logging; +using System.Collections.Immutable; namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; @@ -12,19 +12,13 @@ public sealed record UserApproved(Guid UserId); public sealed record UserAccessGranted(Guid UserId); // ReSharper disable once UnusedType.Global -public class UserLifeCycleProcess : ProcessManager, +public class UserLifeCycleProcess(ILogger logger, IMessageProducer producer) : ProcessManager, IProcessMessage, IProcessMessage, IProcessMessage { - private readonly ILogger _logger; - private readonly IMessageProducer _producer; - - public UserLifeCycleProcess(ILogger logger, IMessageProducer producer) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _producer = producer ?? throw new ArgumentNullException(nameof(producer)); - } + private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + private readonly IMessageProducer _producer = producer ?? throw new ArgumentNullException(nameof(producer)); public Guid GetProcessId(UserRegistered message) => message.UserId; public Guid GetProcessId(UserAccessGranted message) => message.UserId; @@ -36,23 +30,26 @@ public async Task Handle(IMessageContext context, UserRegistered message) await _producer.ProduceAsync(message.UserId.ToString(), new UserApproved(message.UserId)); await _producer.ProduceAsync(message.UserId.ToString(), new UserAccessGranted(message.UserId)); - var newState = new TestState(DateTimeOffset.UtcNow, ImmutableList.Create("UserRegistered")); + var newState = new TestState(DateTimeOffset.UtcNow, ["UserRegistered"]); UpdateState(newState); } public async Task Handle(IMessageContext context, UserApproved message) { _logger.LogInformation("Received message: {Message}", message); - WithRequiredState(state => + + await WithRequiredStateAsync(state => { var newState = state with { Log = state.Log.Add("UserApproved") }; UpdateState(newState); + return Task.CompletedTask; }); } - public async Task Handle(IMessageContext context, UserAccessGranted message) + public Task Handle(IMessageContext context, UserAccessGranted message) { _logger.LogInformation("Received message: {Message}", message); FinishProcess(); + return Task.CompletedTask; } }