From 298067a97d4662b1390bd8209c8c583f4661ad07 Mon Sep 17 00:00:00 2001 From: blindspotbounty <127803250+blindspotbounty@users.noreply.github.com> Date: Fri, 26 Apr 2024 19:01:28 +0300 Subject: [PATCH] Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) (#158) * remove message sequence * test consumer with implicit rebalance * misc + format * remove artefact * don't check a lot of messages * fix typo * slow down first consumer to lower message to fit CI timeout * remove helpers * use exact benchmark version to avoid missing thresholds error (as no thresholds so far) * add deprecated marks for backpressure, change comment for future dev * address comments --- .../KafkaConsumerConfiguration.swift | 32 -- Sources/Kafka/KafkaConsumer.swift | 373 +++--------------- Sources/Kafka/RDKafka/RDKafkaClient.swift | 38 -- Tests/IntegrationTests/KafkaTests.swift | 161 +++++++- 4 files changed, 213 insertions(+), 391 deletions(-) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index 2b3b9bfc..2d76e2da 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Crdkafka import struct Foundation.UUID public struct KafkaConsumerConfiguration { @@ -23,37 +22,6 @@ public struct KafkaConsumerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) - /// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``. - public struct BackPressureStrategy: Sendable, Hashable { - enum _BackPressureStrategy: Sendable, Hashable { - case watermark(low: Int, high: Int) - } - - let _internal: _BackPressureStrategy - - private init(backPressureStrategy: _BackPressureStrategy) { - self._internal = backPressureStrategy - } - - /// A back pressure strategy based on high and low watermarks. - /// - /// The consumer maintains a buffer size between a low watermark and a high watermark - /// to control the flow of incoming messages. - /// - /// - Parameter low: The lower threshold for the buffer size (low watermark). - /// - Parameter high: The upper threshold for the buffer size (high watermark). - public static func watermark(low: Int, high: Int) -> BackPressureStrategy { - return .init(backPressureStrategy: .watermark(low: low, high: high)) - } - } - - /// The backpressure strategy to be used for message consumption. - /// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information. - public var backPressureStrategy: BackPressureStrategy = .watermark( - low: 10, - high: 50 - ) - /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Sendable, Hashable { enum _ConsumptionStrategy: Sendable, Hashable { diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index d7666650..2fdead66 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -34,23 +34,6 @@ extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { } } -// MARK: - KafkaConsumerMessagesDelegate - -/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. -internal struct KafkaConsumerMessagesDelegate: Sendable { - let stateMachine: NIOLockedValueBox -} - -extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { - func produceMore() { - self.stateMachine.withLockedValue { $0.produceMore() } - } - - func didTerminate() { - self.stateMachine.withLockedValue { $0.finishMessageConsumption() } - } -} - // MARK: - KafkaConsumerEvents /// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka. @@ -78,60 +61,62 @@ public struct KafkaConsumerEvents: Sendable, AsyncSequence { /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct KafkaConsumerMessages: Sendable, AsyncSequence { - let stateMachine: NIOLockedValueBox + typealias LockedMachine = NIOLockedValueBox + + let stateMachine: LockedMachine + let pollInterval: Duration public typealias Element = KafkaConsumerMessage - typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark - typealias WrappedSequence = NIOThrowingAsyncSequenceProducer< - Result, - Error, - BackPressureStrategy, - KafkaConsumerMessagesDelegate - > - let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct AsyncIterator: AsyncIteratorProtocol { - let stateMachine: NIOLockedValueBox - var wrappedIterator: WrappedSequence.AsyncIterator? + private let stateMachineHolder: MachineHolder + let pollInterval: Duration - public mutating func next() async throws -> Element? { - guard let result = try await self.wrappedIterator?.next() else { - self.deallocateIterator() - return nil + private final class MachineHolder: Sendable { // only for deinit + let stateMachine: LockedMachine + + init(stateMachine: LockedMachine) { + self.stateMachine = stateMachine } - switch result { - case .success(let message): - let action = self.stateMachine.withLockedValue { $0.storeOffset() } + deinit { + self.stateMachine.withLockedValue { $0.finishMessageConsumption() } + } + } + + init(stateMachine: LockedMachine, pollInterval: Duration) { + self.stateMachineHolder = .init(stateMachine: stateMachine) + self.pollInterval = pollInterval + } + + public func next() async throws -> Element? { + // swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165 + // Currently use Task.sleep() if no new messages, should use task executor preference when implemented: + // https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md + while !Task.isCancelled { + let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } + switch action { - case .storeOffset(let client): - do { - try client.storeMessageOffset(message) - } catch { - self.deallocateIterator() - throw error + case .poll(let client): + if let message = try client.consumerPoll() { // non-blocking call + return message } - return message - case .terminateConsumerSequence: - self.deallocateIterator() + try await Task.sleep(for: self.pollInterval) + case .suspendPollLoop: + try await Task.sleep(for: self.pollInterval) // not started yet + case .terminatePollLoop: return nil } - case .failure(let error): - self.deallocateIterator() - throw error } - } - - private mutating func deallocateIterator() { - self.wrappedIterator = nil + return nil } } public func makeAsyncIterator() -> AsyncIterator { return AsyncIterator( stateMachine: self.stateMachine, - wrappedIterator: self.wrappedSequence.makeAsyncIterator() + pollInterval: self.pollInterval ) } } @@ -140,13 +125,6 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { /// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster. public final class KafkaConsumer: Sendable, Service { - typealias Producer = NIOThrowingAsyncSequenceProducer< - Result, - Error, - NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, - KafkaConsumerMessagesDelegate - > - /// The configuration object of the consumer client. private let configuration: KafkaConsumerConfiguration /// A logger. @@ -178,30 +156,14 @@ public final class KafkaConsumer: Sendable, Service { self.stateMachine = stateMachine self.logger = logger - let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( - elementType: Result.self, - backPressureStrategy: { - switch configuration.backPressureStrategy._internal { - case .watermark(let lowWatermark, let highWatermark): - return NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( - lowWatermark: lowWatermark, - highWatermark: highWatermark - ) - } - }(), - finishOnDeinit: true, - delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) - ) - self.messages = KafkaConsumerMessages( stateMachine: self.stateMachine, - wrappedSequence: sourceAndSequence.sequence + pollInterval: configuration.pollInterval ) self.stateMachine.withLockedValue { $0.initialize( - client: client, - source: sourceAndSequence.source + client: client ) } } @@ -364,20 +326,7 @@ public final class KafkaConsumer: Sendable, Service { case .group(groupID: _, topics: let topics): try self.subscribe(topics: topics) } - - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.eventRunLoop() - } - - group.addTask { - try await self.messageRunLoop() - } - - // Throw when one of the two child task throws - try await group.next() - try await group.next() - } + try await self.eventRunLoop() } /// Run loop polling Kafka for new events. @@ -403,83 +352,6 @@ public final class KafkaConsumer: Sendable, Service { } } - /// Run loop polling Kafka for new consumer messages. - private func messageRunLoop() async throws { - while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } - switch nextAction { - case .pollForAndYieldMessages(let client, let source): - // Poll for new consumer messages. - let messageResults = self.batchConsumerPoll(client: client) - if messageResults.isEmpty { - self.stateMachine.withLockedValue { $0.waitForNewMessages() } - } else { - let yieldResult = source.yield(contentsOf: messageResults) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - return - } - } - case .pollForMessagesIfAvailable(let client, let source): - let messageResults = self.batchConsumerPoll(client: client) - if messageResults.isEmpty { - // Still no new messages, so sleep. - try await Task.sleep(for: self.configuration.pollInterval) - } else { - // New messages were produced to the partition that we previously finished reading. - let yieldResult = source.yield(contentsOf: messageResults) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - return - } - } - case .suspendPollLoop: - try await Task.sleep(for: self.configuration.pollInterval) - case .terminatePollLoop: - return - } - } - } - - /// Read `maxMessages` consumer messages from Kafka. - /// - /// - Parameters: - /// - client: Client used for handling the connection to the Kafka cluster. - /// - maxMessages: Maximum amount of consumer messages to read in this invocation. - private func batchConsumerPoll( - client: RDKafkaClient, - maxMessages: Int = 100 - ) -> [Result] { - var messageResults = [Result]() - messageResults.reserveCapacity(maxMessages) - - for _ in 0..? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - - guard let result else { - return messageResults - } - messageResults.append(result) - } - - return messageResults - } - /// Mark all messages up to the passed message in the topic as read. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. @@ -572,25 +444,6 @@ public final class KafkaConsumer: Sendable, Service { extension KafkaConsumer { /// State machine representing the state of the ``KafkaConsumer``. struct StateMachine: Sendable { - /// State of the event loop fetching new consumer messages. - enum MessagePollLoopState { - /// The sequence can take more messages. - /// - /// - Parameter source: The source for yielding new messages. - case running(source: Producer.Source) - /// Sequence suspended due to back pressure. - /// - /// - Parameter source: The source for yielding new messages. - case suspended(source: Producer.Source) - /// We have read to the end of a partition and are now waiting for new messages - /// to be produced. - /// - /// - Parameter source: The source for yielding new messages. - case waitingForMessages(source: Producer.Source) - /// The sequence has finished, and no more messages will be produced. - case finished - } - /// The state of the ``StateMachine``. enum State: Sendable { /// The state machine has been initialized with init() but is not yet Initialized @@ -602,14 +455,13 @@ extension KafkaConsumer { /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: The source for yielding new messages. case initializing( - client: RDKafkaClient, - source: Producer.Source + client: RDKafkaClient ) /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter state: State of the event loop fetching new consumer messages. - case running(client: RDKafkaClient, messagePollLoopState: MessagePollLoopState) + case running(client: RDKafkaClient) /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. /// We are now in the process of commiting our last state to the broker. /// @@ -625,15 +477,13 @@ extension KafkaConsumer { /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are /// not yet available when the normal initialization occurs. mutating func initialize( - client: RDKafkaClient, - source: Producer.Source + client: RDKafkaClient ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") } self.state = .initializing( - client: client, - source: source + client: client ) } @@ -657,7 +507,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): return .pollForEvents(client: client) case .finishing(let client): if client.isConsumerClosed { @@ -676,20 +526,7 @@ extension KafkaConsumer { /// Poll for a new ``KafkaConsumerMessage``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessages( - client: RDKafkaClient, - source: Producer.Source - ) - /// Poll for a new ``KafkaConsumerMessage`` or sleep for ``KafkaConsumerConfiguration/pollInterval`` - /// if there are no new messages to read from the partition. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForMessagesIfAvailable( - client: RDKafkaClient, - source: Producer.Source - ) + case poll(client: RDKafkaClient) /// Sleep for ``KafkaConsumerConfiguration/pollInterval``. case suspendPollLoop /// Terminate the poll loop. @@ -705,18 +542,9 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, let consumerState): - switch consumerState { - case .running(let source): - return .pollForAndYieldMessages(client: client, source: source) - case .suspended(source: _): - return .suspendPollLoop - case .waitingForMessages(let source): - return .pollForMessagesIfAvailable(client: client, source: source) - case .finished: - return .terminatePollLoop - } + return .suspendPollLoop + case .running(let client): + return .poll(client: client) case .finishing, .finished: return .terminatePollLoop } @@ -738,8 +566,8 @@ extension KafkaConsumer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) + case .initializing(let client): + self.state = .running(client: client) return .setUpConnection(client: client) case .running: fatalError("\(#function) should not be invoked more than once") @@ -750,30 +578,6 @@ extension KafkaConsumer { } } - /// Action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). - enum StoreOffsetAction { - /// Store the message offset with the given `client`. - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case storeOffset(client: RDKafkaClient) - /// The consumer is in the process of `.finishing` or even `.finished`. - /// Stop yielding new elements and terminate the asynchronous sequence. - case terminateConsumerSequence - } - - /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). - func storeOffset() -> StoreOffsetAction { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): - return .storeOffset(client: client) - case .finishing, .finished: - return .terminateConsumerSequence - } - } - /// Action to be taken when wanting to do a commit. enum CommitAction { /// Do a commit. @@ -794,7 +598,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): return .commit(client: client) case .finishing, .finished: return .throwClosedError @@ -819,7 +623,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): self.state = .finishing(client: client) return .triggerGracefulShutdown(client: client) case .finishing, .finished: @@ -827,73 +631,6 @@ extension KafkaConsumer { } } - // MARK: - Consumer Messages Poll Loop Actions - - /// The partition that was previously finished reading has got new messages produced to it. - mutating func newMessagesProduced() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .running, .suspended, .finished: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .waitingForMessages(let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) - } - } - - /// The consumer has read to the end of a partition and shall now go into a sleep loop until new messages are produced. - mutating func waitForNewMessages() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .running(let source): - self.state = .running(client: client, messagePollLoopState: .waitingForMessages(source: source)) - case .suspended, .waitingForMessages: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .finished: - break // ok, skip action - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. - mutating func produceMore() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - break // This case can be triggered by the KafkaConsumerMessagesDeletgate - case .running(let client, let consumerState): - switch consumerState { - case .running, .waitingForMessages, .finished: - break - case .suspended(let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) - } - case .finishing, .finished: - break - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. - mutating func stopProducing() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .suspended, .finished: - break - case .running(let source): - self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) - case .waitingForMessages(let source): - self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) - } - } - /// The ``KafkaConsumerMessages`` asynchronous sequence was terminated. mutating func finishMessageConsumption() { switch self.state { @@ -901,8 +638,8 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: self.state = .finished - case .running(let client, _): - self.state = .running(client: client, messagePollLoopState: .finished) + case .running: + self.state = .finished case .finishing, .finished: break } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 78dc1182..9e62f945 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -68,10 +68,6 @@ public final class RDKafkaClient: Sendable { // Manually override some of the configuration options // Handle logs in event queue try RDKafkaConfig.set(configPointer: rdConfig, key: "log.queue", value: "true") - // KafkaConsumer is manually storing read offsets - if type == .consumer { - try RDKafkaConfig.set(configPointer: rdConfig, key: "enable.auto.offset.store", value: "false") - } RDKafkaConfig.setEvents(configPointer: rdConfig, events: events) let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) @@ -493,40 +489,6 @@ public final class RDKafkaClient: Sendable { } } - /// Store `message`'s offset for next auto-commit. - /// - /// - Important: `enable.auto.offset.store` must be set to `false` when using this API. - func storeMessageOffset(_ message: KafkaConsumerMessage) throws { - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) - ) - - let error = changesList.withListPointer { listPointer in - rd_kafka_offsets_store( - self.kafkaHandle, - listPointer - ) - } - - if error != RD_KAFKA_RESP_ERR_NO_ERROR { - // Ignore RD_KAFKA_RESP_ERR__STATE error. - // RD_KAFKA_RESP_ERR__STATE indicates an attempt to commit to an unassigned partition, - // which can occur during rebalancing or when the consumer is shutting down. - // See "Upgrade considerations" for more details: https://github.com/confluentinc/librdkafka/releases/tag/v1.9.0 - // Since Kafka Consumers are designed for at-least-once processing, failing to commit here is acceptable. - if error == RD_KAFKA_RESP_ERR__STATE { - return - } - throw KafkaError.rdKafkaError(wrapping: error) - } - } - /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 525b9461..d23942c5 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Atomics import struct Foundation.UUID @testable import Kafka @_spi(Internal) import Kafka @@ -448,7 +449,7 @@ final class KafkaTests: XCTestCase { try await group.next() // Verify that we receive the first message - var consumerIterator = consumer.messages.makeAsyncIterator() + let consumerIterator = consumer.messages.makeAsyncIterator() let consumedMessage = try await consumerIterator.next() XCTAssertEqual(testMessages.first!.topic, consumedMessage!.topic) @@ -602,6 +603,159 @@ final class KafkaTests: XCTestCase { } } + func testDuplicatedMessagesOnRebalance() async throws { + let partitionsNumber: Int32 = 12 + do { + var basicConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "no-group", topics: []), + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] + ) + basicConfig.broker.addressFamily = .v4 + + // TODO: ok to block here? How to make setup async? + let client = try RDKafkaClient.makeClient( + type: .consumer, + configDictionary: basicConfig.dictionary, + events: [], + logger: .kafkaTest + ) + // cleanup default test topic and create with 12 partitions + try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000) + self.uniqueTestTopic = try client._createUniqueTopic(partitions: partitionsNumber, timeout: 10 * 1000) + } + + let numOfMessages: UInt = 1000 + let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: numOfMessages) + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: .kafkaTest) + + let producerServiceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let producerServiceGroup = ServiceGroup(configuration: producerServiceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await producerServiceGroup.run() + } + + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + events: acks, + messages: testMessages, + skipConsistencyCheck: true + ) + } + + // Wait for Producer Task to complete + try await group.next() + // Shutdown the serviceGroup + await producerServiceGroup.triggerGracefulShutdown() + } + + // MARK: Consumer + + let uniqueGroupID = UUID().uuidString + + var consumer1Config = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress] + ) + consumer1Config.autoOffsetReset = .beginning + consumer1Config.broker.addressFamily = .v4 + consumer1Config.pollInterval = .milliseconds(1) + consumer1Config.isAutoCommitEnabled = false + + let consumer1 = try KafkaConsumer( + configuration: consumer1Config, + logger: .kafkaTest + ) + + var consumer2Config = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress] + ) + consumer2Config.autoOffsetReset = .beginning + consumer2Config.broker.addressFamily = .v4 + consumer2Config.pollInterval = .milliseconds(1) + consumer2Config.isAutoCommitEnabled = false + + let consumer2 = try KafkaConsumer( + configuration: consumer2Config, + logger: .kafkaTest + ) + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [consumer1], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer2], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2) + + let sharedCtr = ManagedAtomic(0) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task for 1st consumer + group.addTask { + try await serviceGroup1.run() + } + // Run Task for 2nd consumer + group.addTask { + try await Task.sleep(for: .seconds(20)) // wait a bit that first consumer would form a queue + try await serviceGroup2.run() + } + + // First Consumer Task + group.addTask { + // 6 partitions + for try await record in consumer1.messages { + sharedCtr.wrappingIncrement(ordering: .relaxed) + + try consumer1.scheduleCommit(record) // commit time to time + try await Task.sleep(for: .milliseconds(100)) // don't read all messages before 2nd consumer + } + } + + // Second Consumer Task + group.addTask { + // 6 partitions + for try await record in consumer2.messages { + sharedCtr.wrappingIncrement(ordering: .relaxed) + + try consumer2.scheduleCommit(record) // commit time to time + } + } + + // Monitoring task + group.addTask { + while true { + let currentCtr = sharedCtr.load(ordering: .relaxed) + guard currentCtr >= numOfMessages else { + try await Task.sleep(for: .seconds(5)) // wait if new messages come here + continue + } + try await Task.sleep(for: .seconds(5)) // wait for extra messages + await serviceGroup1.triggerGracefulShutdown() + await serviceGroup2.triggerGracefulShutdown() + break + } + } + + try await group.next() + try await group.next() + try await group.next() + + // Wait for second Consumer Task to complete + let totalCtr = sharedCtr.load(ordering: .relaxed) + XCTAssertEqual(totalCtr, Int(numOfMessages)) + } + } + // MARK: - Helpers private static func createTestMessages( @@ -615,8 +769,9 @@ final class KafkaTests: XCTestCase { private static func sendAndAcknowledgeMessages( producer: KafkaProducer, events: KafkaProducerEvents, - messages: [KafkaProducerMessage] + messages: [KafkaProducerMessage], + skipConsistencyCheck: Bool = false ) async throws { - return try await _sendAndAcknowledgeMessages(producer: producer, events: events, messages: messages) + return try await _sendAndAcknowledgeMessages(producer: producer, events: events, messages: messages, skipConsistencyCheck: skipConsistencyCheck) } }