diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index 869791b7bc3f..42e9fca96082 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -26,15 +26,10 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +40,11 @@ public class CommandTopic { private final TopicPartition commandTopicPartition; private Consumer commandConsumer = null; - private Producer commandProducer = null; private final String commandTopicName; public CommandTopic( final String commandTopicName, - final Map kafkaConsumerProperties, - final Map kafkaProducerProperties + final Map kafkaConsumerProperties ) { this( commandTopicName, @@ -59,22 +52,16 @@ public CommandTopic( Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"), InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true), InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false) - ), - new KafkaProducer<>( - Objects.requireNonNull(kafkaProducerProperties, "kafkaClientProperties"), - InternalTopicJsonSerdeUtil.getJsonSerializer(true), - InternalTopicJsonSerdeUtil.getJsonSerializer(false) - )); + ) + ); } CommandTopic( final String commandTopicName, - final Consumer commandConsumer, - final Producer commandProducer + final Consumer commandConsumer ) { this.commandTopicPartition = new TopicPartition(commandTopicName, 0); this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer"); - this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer"); this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); } @@ -86,24 +73,6 @@ public void start() { commandConsumer.assign(Collections.singleton(commandTopicPartition)); } - public RecordMetadata send(final CommandId commandId, final Command command) { - final ProducerRecord producerRecord = new ProducerRecord<>( - commandTopicName, - 0, - Objects.requireNonNull(commandId, "commandId"), - Objects.requireNonNull(command, "command")); - try { - return commandProducer.send(producerRecord).get(); - } catch (final ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException)e.getCause(); - } - throw new RuntimeException(e.getCause()); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - } - public Iterable> getNewCommands(final Duration timeout) { return commandConsumer.poll(timeout); } @@ -150,6 +119,5 @@ public void wakeup() { public void close() { commandConsumer.close(); - commandProducer.close(); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 8244f9a9f92c..9aee97a7e9a5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -143,6 +143,7 @@ public final class KsqlRestApplication extends Application imple private final List preconditions; private final List configurables; private final Consumer rocksDBConfigSetterHandler; + private final ProducerTransactionManagerFactory producerTransactionManagerFactory; public static SourceName getCommandsStreamName() { return COMMANDS_STREAM_NAME; @@ -169,7 +170,8 @@ public static SourceName getCommandsStreamName() { final ProcessingLogContext processingLogContext, final List preconditions, final List configurables, - final Consumer rocksDBConfigSetterHandler + final Consumer rocksDBConfigSetterHandler, + final ProducerTransactionManagerFactory producerTransactionManagerFactory ) { super(config); @@ -192,6 +194,8 @@ public static SourceName getCommandsStreamName() { this.configurables = requireNonNull(configurables, "configurables"); this.rocksDBConfigSetterHandler = requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler"); + this.producerTransactionManagerFactory = + requireNonNull(producerTransactionManagerFactory, "producerTransactionManagerFactory"); } @Override @@ -278,7 +282,8 @@ private void initialize() { processingLogContext.getConfig(), ksqlConfigNoPort, ksqlEngine, - commandStore + commandStore, + producerTransactionManagerFactory ); commandRunner.processPriorCommands(); @@ -470,13 +475,12 @@ static KsqlRestApplication buildApplication( UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load(); - final String commandTopic = KsqlInternalTopicUtils.getTopicName( + final String commandTopicName = KsqlInternalTopicUtils.getTopicName( ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); final CommandStore commandStore = CommandStore.Factory.create( - commandTopic, - restConfig.getCommandConsumerProperties(), - restConfig.getCommandProducerProperties()); + commandTopicName, + restConfig.getCommandConsumerProperties()); final StatementExecutor statementExecutor = new StatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator); @@ -504,19 +508,12 @@ static KsqlRestApplication buildApplication( authorizationValidator ); - final KsqlResource ksqlResource = new KsqlResource( - ksqlEngine, - commandStore, - Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), - versionChecker::updateLastRequestTime, - authorizationValidator - ); - final List managedTopics = new LinkedList<>(); - managedTopics.add(commandTopic); + managedTopics.add(commandTopicName); if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) { managedTopics.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig)); } + final CommandRunner commandRunner = new CommandRunner( statementExecutor, commandStore, @@ -525,6 +522,24 @@ static KsqlRestApplication buildApplication( serverState ); + final ProducerTransactionManagerFactory producerTransactionManagerFactory = + new ProducerTransactionManagerFactory( + commandTopicName, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + commandRunner, + restConfig.getCommandConsumerProperties(), + restConfig.getCommandProducerProperties() + ); + + final KsqlResource ksqlResource = new KsqlResource( + ksqlEngine, + commandStore, + Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), + versionChecker::updateLastRequestTime, + authorizationValidator, + producerTransactionManagerFactory + ); + final List preconditions = restConfig.getConfiguredInstances( KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, KsqlServerPrecondition.class @@ -557,7 +572,8 @@ static KsqlRestApplication buildApplication( processingLogContext, preconditions, configurables, - rocksDBConfigSetterHandler + rocksDBConfigSetterHandler, + producerTransactionManagerFactory ); } @@ -624,13 +640,19 @@ private static void maybeCreateProcessingLogStream( final ProcessingLogConfig config, final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, - final CommandQueue commandQueue + final CommandQueue commandQueue, + final ProducerTransactionManagerFactory producerTransactionManagerFactory ) { if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE) || !commandQueue.isEmpty()) { return; } + final ProducerTransactionManager producerTransactionManager = + producerTransactionManagerFactory.createProducerTransactionManager(); + + producerTransactionManager.begin(); + final PreparedStatement statement = ProcessingLogServerUtils .processingLogStreamCreateStatement(config, ksqlConfig); final Supplier> configured = () -> ConfiguredStatement.of( @@ -646,7 +668,8 @@ private static void maybeCreateProcessingLogStream( return; } - commandQueue.enqueueCommand(configured.get()); + commandQueue.enqueueCommand(configured.get(), producerTransactionManager); + producerTransactionManager.commit(); } /** diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManager.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManager.java new file mode 100644 index 000000000000..6c2fd30b4a8d --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManager.java @@ -0,0 +1,161 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.rest.server.computation.CommandRunner; +import io.confluent.ksql.rest.util.InternalTopicJsonSerdeUtil; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +/** + * Used to handle transactional writes to the command topic + */ +public class ProducerTransactionManager { + + private final TopicPartition commandTopicPartition; + private final String commandTopicName; + + private final Consumer commandConsumer; + private final Producer commandProducer; + private final CommandRunner commandRunner; + + public ProducerTransactionManager( + final String commandTopicName, + final CommandRunner commandRunner, + final Map kafkaConsumerProperties, + final Map kafkaProducerProperties + ) { + this.commandTopicPartition = new TopicPartition( + Objects.requireNonNull(commandTopicName, "commandTopicName"), + 0 + ); + + this.commandConsumer = new KafkaConsumer<>( + Objects.requireNonNull(kafkaConsumerProperties, "kafkaConsumerProperties"), + InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true), + InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false) + ); + + this.commandProducer = new KafkaProducer<>( + Objects.requireNonNull(kafkaProducerProperties, "kafkaProducerProperties"), + InternalTopicJsonSerdeUtil.getJsonSerializer(true), + InternalTopicJsonSerdeUtil.getJsonSerializer(false) + ); + this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); + this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); + } + + @VisibleForTesting + ProducerTransactionManager( + final String commandTopicName, + final CommandRunner commandRunner, + final Consumer commandConsumer, + final Producer commandProducer + ) { + this.commandTopicPartition = new TopicPartition( + Objects.requireNonNull(commandTopicName, "commandTopicName"), + 0 + ); + this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer"); + this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer"); + this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); + this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); + } + + + /** begins transaction */ + public void begin() { + commandConsumer.assign(Collections.singleton(commandTopicPartition)); + commandProducer.initTransactions(); + commandProducer.beginTransaction(); + } + + public void waitForCommandRunner() { + final long endOffset = getEndOffset(); + + try { + int retries = 0; + while (commandRunner.getNumCommandProcessed() < endOffset) { + Thread.sleep(1000); + + if (retries == 60) { + throw new RuntimeException("commandRunner has not processed all commands in topic"); + } + retries++; + } + } catch (Exception exception) { + throw new RuntimeException( + "Error while waiting for commandRunner to process command topic:", + exception + ); + } + } + + private long getEndOffset() { + return commandConsumer.endOffsets(Collections.singletonList(commandTopicPartition)) + .get(commandTopicPartition); + } + + public RecordMetadata send(final CommandId commandId, final Command command) { + final ProducerRecord producerRecord = new ProducerRecord<>( + commandTopicName, + 0, + Objects.requireNonNull(commandId, "commandId"), + Objects.requireNonNull(command, "command")); + try { + return commandProducer.send(producerRecord).get(); + } catch (final ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw new RuntimeException(e.getCause()); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (KafkaException e) { + commandProducer.abortTransaction(); + throw new KafkaException(e); + } + } + + public void abort() { + commandProducer.abortTransaction(); + close(); + + } + + public void commit() { + commandProducer.commitTransaction(); + close(); + } + + public void close() { + commandConsumer.close(); + commandProducer.close(); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManagerFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManagerFactory.java new file mode 100644 index 000000000000..14be1df9664e --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManagerFactory.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.rest.server.computation.CommandRunner; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.clients.producer.ProducerConfig; + +public class ProducerTransactionManagerFactory { + private final String commandTopicName; + private final CommandRunner commandRunner; + private final Map kafkaConsumerProperties; + private final Map kafkaProducerProperties; + + public ProducerTransactionManagerFactory( + final String commandTopicName, + final String transactionId, + final CommandRunner commandRunner, + final Map kafkaConsumerProperties, + final Map kafkaProducerProperties + ) { + this.kafkaConsumerProperties = + Objects.requireNonNull(kafkaConsumerProperties, "kafkaConsumerProperties"); + this.kafkaProducerProperties = + Objects.requireNonNull(kafkaProducerProperties, "kafkaProducerProperties"); + this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); + this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); + + kafkaProducerProperties.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + Objects.requireNonNull(transactionId, "transactionId") + ); + } + + public ProducerTransactionManager createProducerTransactionManager() { + return new ProducerTransactionManager( + commandTopicName, + commandRunner, + kafkaConsumerProperties, + kafkaProducerProperties + ); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java index 54aa6c74ba7b..c83b18491b5c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server.computation; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.statement.ConfiguredStatement; import java.io.Closeable; import java.time.Duration; @@ -34,12 +35,15 @@ public interface CommandQueue extends Closeable { * it is guaranteed that the command has been persisted, without regard * for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}. * - * @param statement The statement to be distributed - * + * @param statement The statement to be distributed + * @param producerTransactionManager The transaction manager for enqueueing command * @return an asynchronous tracker that can be used to determine the current * state of the command */ - QueuedCommandStatus enqueueCommand(ConfiguredStatement statement); + QueuedCommandStatus enqueueCommand( + ConfiguredStatement statement, + ProducerTransactionManager producerTransactionManager + ); /** * Polls the Queue for any commands that have been enqueued since the last diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 43dbddf96b1b..4fb44c47b27c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -59,6 +59,8 @@ public class CommandRunner implements Closeable { private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private long numCommandProcessed; + public CommandRunner( final StatementExecutor statementExecutor, final CommandQueue commandStore, @@ -91,6 +93,7 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.numCommandProcessed = 0; } /** @@ -136,6 +139,9 @@ public void processPriorCommands() { WakeupException.class ) ); + + numCommandProcessed = restoreCommands.size(); + final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine(); ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start); } @@ -170,6 +176,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { log.info("Execution aborted as system is closing down"); } else { statementExecutor.handleStatement(queuedCommand); + numCommandProcessed++; log.info("Executed statement: " + queuedCommand.getCommand().getStatement()); } }; @@ -204,6 +211,10 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } + public long getNumCommandProcessed() { + return numCommandProcessed; + } + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 1bc1dab1eda0..5818c20fff03 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -19,6 +19,7 @@ import com.google.common.collect.Maps; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.CommandTopic; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; import java.io.Closeable; @@ -53,11 +54,10 @@ private Factory() { public static CommandStore create( final String commandTopicName, - final Map kafkaConsumerProperties, - final Map kafkaProducerProperties + final Map kafkaConsumerProperties ) { return new CommandStore( - new CommandTopic(commandTopicName, kafkaConsumerProperties, kafkaProducerProperties), + new CommandTopic(commandTopicName, kafkaConsumerProperties), new CommandIdAssigner(), new SequenceNumberFutureStore() ); @@ -98,7 +98,10 @@ public void close() { } @Override - public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement) { + public QueuedCommandStatus enqueueCommand( + final ConfiguredStatement statement, + final ProducerTransactionManager producerTransactionManager + ) { final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); // new commands that generate queries will use the new query id generation method from now on @@ -126,7 +129,7 @@ public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement ); try { final RecordMetadata recordMetadata = - commandTopic.send(commandId, command); + producerTransactionManager.send(commandId, command); return new QueuedCommandStatus(recordMetadata.offset(), statusFuture); } catch (final Exception e) { commandStatusMap.remove(commandId); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 61ba067b87c3..cb6ce357be73 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -20,6 +20,7 @@ import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.rest.server.execution.StatementExecutor; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.ServiceContext; @@ -45,6 +46,8 @@ public class DistributingExecutor implements StatementExecutor { private final BiFunction injectorFactory; private final KsqlAuthorizationValidator authorizationValidator; + private ProducerTransactionManager producerTransactionManager; + public DistributingExecutor( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, @@ -64,7 +67,8 @@ public Optional execute( final ConfiguredStatement statement, final Map mutableScopedProperties, final KsqlExecutionContext executionContext, - final ServiceContext serviceContext) { + final ServiceContext serviceContext + ) { final ConfiguredStatement injected = injectorFactory .apply(executionContext, serviceContext) .inject(statement); @@ -72,10 +76,17 @@ public Optional execute( checkAuthorization(injected, serviceContext, executionContext); try { - final QueuedCommandStatus queuedCommandStatus = commandQueue.enqueueCommand(injected); + if (producerTransactionManager == null) { + throw new RuntimeException("Transaction manager for distributing executor not set"); + } + + final QueuedCommandStatus queuedCommandStatus = + commandQueue.enqueueCommand(injected, producerTransactionManager); + final CommandStatus commandStatus = queuedCommandStatus .tryWaitForFinalStatus(distributedCmdResponseTimeout); + producerTransactionManager = null; return Optional.of(new CommandStatusEntity( injected.getStatementText(), queuedCommandStatus.getCommandId(), @@ -89,6 +100,10 @@ public Optional execute( } } + public void setTransactionManager(final ProducerTransactionManager producerTransactionManager) { + this.producerTransactionManager = producerTransactionManager; + } + private void checkAuthorization( final ConfiguredStatement configured, final ServiceContext userServiceContext, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index 008a8c73f403..c610adfa2eca 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -23,6 +23,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.rest.server.computation.DistributingExecutor; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -73,14 +74,20 @@ public RequestHandler( public KsqlEntityList execute( final ServiceContext serviceContext, final List statements, - final Map propertyOverrides + final Map propertyOverrides, + final ProducerTransactionManager producerTransactionManager ) { final Map scopedPropertyOverrides = new HashMap<>(propertyOverrides); final KsqlEntityList entities = new KsqlEntityList(); for (ParsedStatement parsed : statements) { final PreparedStatement prepared = ksqlEngine.prepare(parsed); if (prepared.getStatement() instanceof RunScript) { - final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides); + final KsqlEntityList result = executeRunScript( + serviceContext, + prepared, + propertyOverrides, + producerTransactionManager + ); if (!result.isEmpty()) { // This is to maintain backwards compatibility until we deprecate // RunScript in the next major release - the expected behavior was @@ -90,8 +97,13 @@ public KsqlEntityList execute( } else { final ConfiguredStatement configured = ConfiguredStatement.of( prepared, scopedPropertyOverrides, ksqlConfig); - executeStatement(serviceContext, configured, scopedPropertyOverrides, entities) - .ifPresent(entities::add); + executeStatement( + serviceContext, + configured, + scopedPropertyOverrides, + entities, + producerTransactionManager + ).ifPresent(entities::add); } } return entities; @@ -102,7 +114,8 @@ private Optional executeStatement( final ServiceContext serviceContext, final ConfiguredStatement configured, final Map mutableScopedProperties, - final KsqlEntityList entities + final KsqlEntityList entities, + final ProducerTransactionManager producerTransactionManager ) { final Class statementClass = configured.getStatement().getClass(); commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass); @@ -110,6 +123,10 @@ private Optional executeStatement( final StatementExecutor executor = (StatementExecutor) customExecutors.getOrDefault(statementClass, distributor); + if (executor instanceof DistributingExecutor) { + ((DistributingExecutor) executor).setTransactionManager(producerTransactionManager); + } + return executor.execute( configured, mutableScopedProperties, @@ -121,7 +138,9 @@ private Optional executeStatement( private KsqlEntityList executeRunScript( final ServiceContext serviceContext, final PreparedStatement statement, - final Map propertyOverrides) { + final Map propertyOverrides, + final ProducerTransactionManager producerTransactionManager + ) { final String sql = (String) propertyOverrides .get(KsqlConstants.LEGACY_RUN_SCRIPT_STATEMENTS_CONTENT); @@ -130,6 +149,11 @@ private KsqlEntityList executeRunScript( "Request is missing script content", statement.getStatementText()); } - return execute(serviceContext, ksqlEngine.parse(sql), propertyOverrides); + return execute( + serviceContext, + ksqlEngine.parse(sql), + propertyOverrides, + producerTransactionManager + ); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 780d44e24de2..17e0da951aca 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -34,6 +34,8 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; +import io.confluent.ksql.rest.server.ProducerTransactionManager; +import io.confluent.ksql.rest.server.ProducerTransactionManagerFactory; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.computation.DistributingExecutor; import io.confluent.ksql.rest.server.execution.CustomExecutors; @@ -98,6 +100,7 @@ public class KsqlResource implements KsqlConfigurable { private final ActivenessRegistrar activenessRegistrar; private final BiFunction injectorFactory; private final KsqlAuthorizationValidator authorizationValidator; + private final ProducerTransactionManagerFactory producerTransactionManagerFactory; private RequestValidator validator; private RequestHandler handler; @@ -107,7 +110,8 @@ public KsqlResource( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, - final KsqlAuthorizationValidator authorizationValidator + final KsqlAuthorizationValidator authorizationValidator, + final ProducerTransactionManagerFactory producerTransactionManagerFactory ) { this( ksqlEngine, @@ -115,7 +119,8 @@ public KsqlResource( distributedCmdResponseTimeout, activenessRegistrar, Injectors.DEFAULT, - authorizationValidator + authorizationValidator, + producerTransactionManagerFactory ); } @@ -125,7 +130,8 @@ public KsqlResource( final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, final BiFunction injectorFactory, - final KsqlAuthorizationValidator authorizationValidator + final KsqlAuthorizationValidator authorizationValidator, + final ProducerTransactionManagerFactory producerTransactionManagerFactory ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -136,6 +142,8 @@ public KsqlResource( this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); this.authorizationValidator = Objects .requireNonNull(authorizationValidator, "authorizationValidator"); + this.producerTransactionManagerFactory = Objects.requireNonNull( + producerTransactionManagerFactory, "producerTransactionManagerFactory"); } @Override @@ -182,7 +190,12 @@ public Response terminateCluster( ensureValidPatterns(request.getDeleteTopicList()); try { return Response.ok( - handler.execute(serviceContext, TERMINATE_CLUSTER, request.getStreamsProperties()) + handler.execute( + serviceContext, + TERMINATE_CLUSTER, + request.getStreamsProperties(), + producerTransactionManagerFactory.createProducerTransactionManager() + ) ).build(); } catch (final Exception e) { return Errors.serverErrorForStatement( @@ -208,6 +221,13 @@ public Response handleKsqlStatements( distributedCmdResponseTimeout); final List statements = ksqlEngine.parse(request.getKsql()); + + final ProducerTransactionManager producerTransactionManager = + producerTransactionManagerFactory.createProducerTransactionManager(); + + producerTransactionManager.begin(); + producerTransactionManager.waitForCommandRunner(); + validator.validate( SandboxedServiceContext.create(serviceContext), statements, @@ -218,8 +238,11 @@ public Response handleKsqlStatements( final KsqlEntityList entities = handler.execute( serviceContext, statements, - request.getStreamsProperties() + request.getStreamsProperties(), + producerTransactionManager ); + + producerTransactionManager.commit(); return Response.ok(entities).build(); } catch (final KsqlRestException e) { throw e; @@ -229,6 +252,7 @@ public Response handleKsqlStatements( return ErrorResponseUtil.generateResponse( e, Errors.badRequest(e)); } catch (final Exception e) { + e.printStackTrace(); return ErrorResponseUtil.generateResponse( e, Errors.serverErrorForStatement(e, request.getKsql())); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index 49924445d084..8bce7743833d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -34,13 +34,10 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Before; @@ -59,8 +56,6 @@ public class CommandTopicTest { private static final String COMMAND_TOPIC_NAME = "foo"; @Mock private Consumer commandConsumer; - @Mock - private Producer commandProducer; private CommandTopic commandTopic; @@ -90,12 +85,10 @@ public class CommandTopicTest { private final static TopicPartition TOPIC_PARTITION = new TopicPartition(COMMAND_TOPIC_NAME, 0); - @Before @SuppressWarnings("unchecked") public void setup() { - commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandProducer); - when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future); + commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer); } @Test @@ -108,52 +101,6 @@ public void shouldAssignCorrectPartitionToConsumer() { .assign(eq(Collections.singleton(new TopicPartition(COMMAND_TOPIC_NAME, 0)))); } - @Test - public void shouldSendCommandCorrectly() throws Exception { - // When - commandTopic.send(commandId1, command1); - - // Then - verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1)); - verify(future).get(); - } - - @Test - public void shouldThrowExceptionIfSendIsNotSuccessful() throws Exception { - // Given: - when(future.get()) - .thenThrow(new ExecutionException(new RuntimeException("Send was unsuccessful!"))); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("Send was unsuccessful!"); - - // When - commandTopic.send(commandId1, command1); - } - - @Test - public void shouldThrowRuntimeExceptionIfSendCausesNonRuntimeException() throws Exception { - // Given: - when(future.get()).thenThrow(new ExecutionException( - new Exception("Send was unsuccessful because of non RunTime exception!"))); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "java.lang.Exception: Send was unsuccessful because of non RunTime exception!"); - - // When - commandTopic.send(commandId1, command1); - } - - @Test - public void shouldThrowRuntimeExceptionIfSendThrowsInterruptedException() throws Exception { - // Given: - when(future.get()).thenThrow(new InterruptedException("InterruptedException")); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("InterruptedException"); - - // When - commandTopic.send(commandId1, command1); - } - @Test public void shouldGetNewCommandsIteratorCorrectly() { // Given: @@ -279,7 +226,6 @@ public void shouldCloseAllResources() { //Then: verify(commandConsumer).close(); - verify(commandProducer).close(); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 48ec795fdfda..d3e26c661cce 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -126,6 +126,10 @@ public class KsqlRestApplicationTest { private PreparedStatement preparedStatement; @Mock private Consumer rocksDBConfigSetterHandler; + @Mock + private ProducerTransactionManagerFactory producerTransactionManagerFactory; + @Mock + private ProducerTransactionManager producerTransactionManager; private PreparedStatement logCreateStatement; private KsqlRestApplication app; @@ -147,13 +151,15 @@ public void setUp() { when(ksqlEngine.prepare(any())).thenReturn((PreparedStatement)preparedStatement); when(commandQueue.isEmpty()).thenReturn(true); - when(commandQueue.enqueueCommand(any())) + when(commandQueue.enqueueCommand(any(), any(ProducerTransactionManager.class))) .thenReturn(queuedCommandStatus); when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME); when(serviceContext.getTopicClient()).thenReturn(topicClient); when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false); when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty()); when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty()); + when(producerTransactionManagerFactory.createProducerTransactionManager()). + thenReturn(producerTransactionManager); logCreateStatement = ProcessingLogServerUtils.processingLogStreamCreateStatement( processingLogConfig, @@ -178,7 +184,8 @@ public void setUp() { processingLogContext, ImmutableList.of(precondition1, precondition2), ImmutableList.of(ksqlResource, streamedQueryResource), - rocksDBConfigSetterHandler + rocksDBConfigSetterHandler, + producerTransactionManagerFactory ); } @@ -238,7 +245,8 @@ public void shouldCreateLogStream() { argThat(configured(equalTo(logCreateStatement))) ); verify(commandQueue).enqueueCommand( - argThat(configured(equalTo(logCreateStatement), Collections.emptyMap(), ksqlConfig))); + argThat(configured(equalTo(logCreateStatement), Collections.emptyMap(), ksqlConfig)), + any(ProducerTransactionManager.class)); } @Test @@ -251,7 +259,7 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() { app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any()); + verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -263,7 +271,7 @@ public void shouldOnlyCreateLogStreamIfCommandTopicEmpty() { app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any()); + verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -275,7 +283,7 @@ public void shouldNotCreateLogStreamIfValidationFails() { app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any()); + verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -286,7 +294,9 @@ public void shouldStartCommandStoreBeforeEnqueuingLogStream() { // Then: final InOrder inOrder = Mockito.inOrder(commandQueue); inOrder.verify(commandQueue).start(); - inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); + inOrder.verify(commandQueue).enqueueCommand( + argThat(configured(equalTo(logCreateStatement))), + any(ProducerTransactionManager.class)); } @Test @@ -297,7 +307,9 @@ public void shouldCreateLogTopicBeforeEnqueuingLogStream() { // Then: final InOrder inOrder = Mockito.inOrder(topicClient, commandQueue); inOrder.verify(topicClient).createTopic(eq(LOG_TOPIC_NAME), anyInt(), anyShort()); - inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); + inOrder.verify(commandQueue).enqueueCommand( + argThat(configured(equalTo(logCreateStatement))), + any(ProducerTransactionManager.class)); } @Test @@ -330,7 +342,9 @@ public void shouldEnqueueLogStreamBeforeSettingReady() { // Then: final InOrder inOrder = Mockito.inOrder(commandQueue, serverState); - inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); + inOrder.verify(commandQueue).enqueueCommand( + argThat(configured(equalTo(logCreateStatement))), + any(ProducerTransactionManager.class)); inOrder.verify(serverState).setReady(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ProducerTransactionManagerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ProducerTransactionManagerTest.java new file mode 100644 index 000000000000..30b48dab74b2 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ProducerTransactionManagerTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.rest.server.computation.CommandRunner; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ProducerTransactionManagerTest { + + private static final String COMMAND_TOPIC_NAME = "foo"; + @Mock + private Consumer commandConsumer; + @Mock + private Producer commandProducer; + @Mock + private CommandRunner commandRunner; + + private ProducerTransactionManager producerTransactionManager; + + @Mock + private Future future; + + @Mock + private CommandId commandId1; + @Mock + private Command command1; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final static TopicPartition TOPIC_PARTITION = new TopicPartition(COMMAND_TOPIC_NAME, 0); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + producerTransactionManager = new ProducerTransactionManager( + COMMAND_TOPIC_NAME, + commandRunner, + commandConsumer, + commandProducer + ); + when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future); + } + + @Test + public void shouldAssignCorrectPartitionToConsumerAndBeginTransaction() { + // When: + producerTransactionManager.begin(); + + // Then: + verify(commandConsumer) + .assign(eq(Collections.singleton(new TopicPartition(COMMAND_TOPIC_NAME, 0)))); + verify(commandProducer).initTransactions(); + verify(commandProducer).beginTransaction(); + } + + @Test + public void shouldCloseAllResources() { + // When: + producerTransactionManager.close(); + + //Then: + verify(commandProducer).abortTransaction(); + verify(commandProducer).close(); + verify(commandConsumer).close(); + } + + @Test + public void shouldSendCommandCorrectly() throws Exception { + // When + producerTransactionManager.send(commandId1, command1); + + // Then + verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1)); + verify(future).get(); + } + + @Test + public void shouldThrowExceptionIfSendIsNotSuccessful() throws Exception { + // Given: + when(future.get()) + .thenThrow(new ExecutionException(new RuntimeException("Send was unsuccessful!"))); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Send was unsuccessful!"); + + // When + producerTransactionManager.send(commandId1, command1); + } + + @Test + public void shouldThrowRuntimeExceptionIfSendCausesNonRuntimeException() throws Exception { + // Given: + when(future.get()).thenThrow(new ExecutionException( + new Exception("Send was unsuccessful because of non RunTime exception!"))); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "java.lang.Exception: Send was unsuccessful because of non RunTime exception!"); + + // When + producerTransactionManager.send(commandId1, command1); + } + + @Test + public void shouldThrowRuntimeExceptionIfSendThrowsInterruptedException() throws Exception { + // Given: + when(future.get()).thenThrow(new InterruptedException("InterruptedException")); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("InterruptedException"); + + // When + producerTransactionManager.send(commandId1, command1); + } + + @Test + public void shouldCommitTransaction() { + // When: + producerTransactionManager.commit(); + + //Then: + verify(commandProducer).commitTransaction(); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index 5223a2613e35..50df1870e26f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -35,6 +35,7 @@ import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.server.CommandTopic; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -92,6 +93,9 @@ public class CommandStoreTest { private Statement statement; @Mock private CommandIdAssigner commandIdAssigner; + @Mock + private ProducerTransactionManager producerTransactionManager; + private ConfiguredStatement configured; private final CommandId commandId = @@ -109,7 +113,7 @@ public void setUp() { .thenAnswer(invocation -> new CommandId( CommandId.Type.STREAM, "foo" + COUNTER.getAndIncrement(), CommandId.Action.CREATE)); - when(commandTopic.send(any(), any())).thenReturn(recordMetadata); + when(producerTransactionManager.send(any(), any())).thenReturn(recordMetadata); when(commandTopic.getNewCommands(any())).thenReturn(buildRecords(commandId, command)); @@ -131,44 +135,44 @@ public void setUp() { public void shouldFailEnqueueIfCommandWithSameIdRegistered() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); expectedException.expect(IllegalStateException.class); // When: - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); } @Test public void shouldCleanupCommandStatusOnProduceError() { // Given: - when(commandTopic.send(any(), any())) + when(producerTransactionManager.send(any(), any())) .thenThrow(new RuntimeException("oops")) .thenReturn(recordMetadata); expectedException.expect(KsqlException.class); expectedException.expectMessage("Could not write the statement 'test-statement' into the command topic."); - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); // When: - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); } @Test public void shouldEnqueueNewAfterHandlingExistingCommand() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); commandStore.getNewCommands(NEW_CMDS_TIMEOUT); // Should: - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); } @Test public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - when(commandTopic.send(any(), any())).thenAnswer( + when(producerTransactionManager.send(any(), any())).thenAnswer( invocation -> { final QueuedCommand queuedCommand = commandStore.getNewCommands(NEW_CMDS_TIMEOUT).get(0); assertThat(queuedCommand.getCommandId(), equalTo(commandId)); @@ -182,10 +186,10 @@ public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() { ); // When: - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); // Then: - verify(commandTopic).send(any(), any()); + verify(producerTransactionManager).send(any(), any()); } @Test @@ -209,20 +213,20 @@ public void shouldFilterNullCommands() { @Test public void shouldDistributeCommand() { when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - when(commandTopic.send(any(), any())).thenReturn(recordMetadata); + when(producerTransactionManager.send(any(), any())).thenReturn(recordMetadata); // When: - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); // Then: - verify(commandTopic).send(same(commandId), any()); + verify(producerTransactionManager).send(same(commandId), any()); } @Test public void shouldIncludeCommandSequenceNumberInSuccessfulQueuedCommandStatus() { // When: final QueuedCommandStatus commandStatus = - commandStore.enqueueCommand(configured); + commandStore.enqueueCommand(configured, producerTransactionManager); // Then: assertThat(commandStatus.getCommandSequenceNumber(), equalTo(recordMetadata.offset())); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index aa753cd7a0b5..6d79c45151ff 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -39,6 +39,7 @@ import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatus.Status; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -84,6 +85,7 @@ public class DistributingExecutorTest { @Mock KsqlAuthorizationValidator authorizationValidator; @Mock KsqlExecutionContext executionContext; @Mock MetaStore metaStore; + @Mock ProducerTransactionManager producerTransactionManager; private DistributingExecutor distributor; private AtomicLong scnCounter; @@ -93,7 +95,7 @@ public void setUp() throws InterruptedException { scnCounter = new AtomicLong(); when(schemaInjector.inject(any())).thenAnswer(inv -> inv.getArgument(0)); when(topicInjector.inject(any())).thenAnswer(inv -> inv.getArgument(0)); - when(queue.enqueueCommand(any())).thenReturn(status); + when(queue.enqueueCommand(any(), any(ProducerTransactionManager.class))).thenReturn(status); when(status.tryWaitForFinalStatus(any())).thenReturn(SUCCESS_STATUS); when(status.getCommandId()).thenReturn(CS_COMMAND); when(status.getCommandSequenceNumber()).thenAnswer(inv -> scnCounter.incrementAndGet()); @@ -106,6 +108,7 @@ public void setUp() throws InterruptedException { (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), authorizationValidator ); + distributor.setTransactionManager(producerTransactionManager); } @Test @@ -114,7 +117,7 @@ public void shouldEnqueueSuccessfulCommand() throws InterruptedException { distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext); // Then: - verify(queue, times(1)).enqueueCommand(eq(EMPTY_STATEMENT)); + verify(queue, times(1)).enqueueCommand(eq(EMPTY_STATEMENT), any()); } @Test @@ -148,7 +151,7 @@ public void shouldReturnCommandStatus() { public void shouldThrowExceptionOnFailureToEnqueue() { // Given: final KsqlException cause = new KsqlException("fail"); - when(queue.enqueueCommand(any())).thenThrow(cause); + when(queue.enqueueCommand(any(), any(ProducerTransactionManager.class))).thenThrow(cause); final PreparedStatement preparedStatement = PreparedStatement.of("x", new ListProperties(Optional.empty())); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index d705ae0132bf..49c94461c122 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -41,6 +42,8 @@ import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.server.ProducerTransactionManager; +import io.confluent.ksql.rest.server.ProducerTransactionManagerFactory; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; @@ -72,6 +75,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; public class RecoveryTest { @@ -85,9 +89,24 @@ public class RecoveryTest { private final HybridQueryIdGenerator hybridQueryIdGenerator = new HybridQueryIdGenerator(); private final ServiceContext serviceContext = TestServiceContext.create(topicClient); + @Mock + private final ProducerTransactionManagerFactory producerTransactionManagerFactory = + mock(ProducerTransactionManagerFactory.class); + @Mock + private final ProducerTransactionManager producerTransactionManager = + mock(ProducerTransactionManager.class); + private final KsqlServer server1 = new KsqlServer(commands); private final KsqlServer server2 = new KsqlServer(commands); + + @Before + public void setup() { + when(producerTransactionManagerFactory.createProducerTransactionManager()).thenReturn( + producerTransactionManager + ); + } + @After public void tearDown() { server1.close(); @@ -116,7 +135,7 @@ private static class FakeCommandQueue implements CommandQueue { } @Override - public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement) { + public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement, final ProducerTransactionManager producerTransactionManager) { final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); final long commandSequenceNumber = commandLog.size(); commandLog.add( @@ -177,14 +196,6 @@ private class KsqlServer { this.fakeCommandQueue = new FakeCommandQueue(commandLog); serverState = new ServerState(); serverState.setReady(); - this.ksqlResource = new KsqlResource( - ksqlEngine, - fakeCommandQueue, - Duration.ofMillis(0), - ()->{}, - (sc, metastore, statement) -> { - } - ); this.statementExecutor = new StatementExecutor( serviceContext, @@ -200,6 +211,15 @@ private class KsqlServer { serverState ); + this.ksqlResource = new KsqlResource( + ksqlEngine, + fakeCommandQueue, + Duration.ofMillis(0), + ()->{}, + (sc, metastore, statement) -> { }, + producerTransactionManagerFactory + ); + this.statementExecutor.configure(ksqlConfig); this.ksqlResource.configure(ksqlConfig); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java index 3e54152011c8..65a4dbce0d64 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java @@ -42,6 +42,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.server.ProducerTransactionManager; import io.confluent.ksql.rest.server.computation.DistributingExecutor; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; @@ -71,6 +72,7 @@ public class RequestHandlerTest { @Mock DistributingExecutor distributor; @Mock KsqlEntity entity; @Mock CommandQueueSync sync; + @Mock ProducerTransactionManager producerTransactionManager; private MetaStore metaStore; private RequestHandler handler; @@ -98,7 +100,7 @@ public void shouldUseCustomExecutor() { // When final List statements = new DefaultKsqlParser().parse(SOME_STREAM_SQL); - final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of()); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), producerTransactionManager); // Then assertThat(entities, contains(entity)); @@ -121,7 +123,7 @@ public void shouldDefaultToDistributor() { // When final List statements = new DefaultKsqlParser().parse(SOME_STREAM_SQL); - final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of()); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), producerTransactionManager); // Then assertThat(entities, contains(entity)); @@ -147,7 +149,8 @@ public void shouldDistributeProperties() { final KsqlEntityList entities = handler.execute( serviceContext, statements, - ImmutableMap.of("x", "y") + ImmutableMap.of("x", "y"), + producerTransactionManager ); // Then @@ -184,7 +187,7 @@ public void shouldWaitForDistributedStatements() { ); // When - handler.execute(serviceContext, statements, ImmutableMap.of()); + handler.execute(serviceContext, statements, ImmutableMap.of(), producerTransactionManager); // Then verify(sync).waitFor(argThat(hasItems(entity1, entity2)), any()); @@ -208,7 +211,7 @@ public void shouldInlineRunScriptStatements() { // When: final List statements = new DefaultKsqlParser() .parse("RUN SCRIPT '/some/script.sql';" ); - handler.execute(serviceContext, statements, props); + handler.execute(serviceContext, statements, props, producerTransactionManager); // Then: verify(customExecutor, times(1)) @@ -239,7 +242,7 @@ public void shouldOnlyReturnLastInRunScript() { .parse("RUN SCRIPT '/some/script.sql';" ); // When: - final KsqlEntityList result = handler.execute(serviceContext, statements, props); + final KsqlEntityList result = handler.execute(serviceContext, statements, props, producerTransactionManager); // Then: assertThat(result, contains(entity2)); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index cd6a41c3d0f1..270648137f1b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -114,6 +114,8 @@ import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.ProducerTransactionManager; +import io.confluent.ksql.rest.server.ProducerTransactionManagerFactory; import io.confluent.ksql.rest.server.computation.CommandStatusFuture; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; @@ -235,6 +237,8 @@ public class KsqlResourceTest { .valueColumn(ColumnName.of("f1"), SqlTypes.STRING) .build(); + private static final String COMMAND_TOPIC_NAME = "command-topic"; + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -263,6 +267,10 @@ public class KsqlResourceTest { private Injector sandboxTopicInjector; @Mock private KsqlAuthorizationValidator authorizationValidator; + @Mock + private ProducerTransactionManagerFactory producerTransactionManagerFactory; + @Mock + private ProducerTransactionManager producerTransactionManager; private KsqlResource ksqlResource; private SchemaRegistryClient schemaRegistryClient; @@ -298,12 +306,15 @@ public void setUp() throws IOException, RestClientException { metaStore ); + when(producerTransactionManagerFactory.createProducerTransactionManager()) + .thenReturn(producerTransactionManager); + ksqlEngine = realEngine; when(sandbox.getMetaStore()).thenAnswer(inv -> metaStore.copy()); addTestTopicAndSources(); - when(commandStore.enqueueCommand(any())) + when(commandStore.enqueueCommand(any(), any(ProducerTransactionManager.class))) .thenReturn(commandStatus) .thenReturn(commandStatus1) .thenReturn(commandStatus2); @@ -354,7 +365,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + authorizationValidator, + producerTransactionManagerFactory ); // Then: @@ -382,7 +394,8 @@ public void shouldThrowOnHandleTerminateIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + authorizationValidator, + producerTransactionManagerFactory ); // Then: @@ -688,7 +701,9 @@ public void shouldDistributePersistentQuery() { configured( preparedStatement( "CREATE STREAM S AS SELECT * FROM test_stream;", - CreateStreamAsSelect.class))))); + CreateStreamAsSelect.class))) + ), any(ProducerTransactionManager.class) + ); } @Test @@ -698,7 +713,7 @@ public void shouldDistributeWithConfig() { // Then: verify(commandStore).enqueueCommand( - argThat(configured(VALID_EXECUTABLE_REQUEST.getStreamsProperties(), ksqlConfig))); + argThat(configured(VALID_EXECUTABLE_REQUEST.getStreamsProperties(), ksqlConfig)), any(ProducerTransactionManager.class)); } @Test @@ -807,7 +822,8 @@ public void shouldDistributeAvoCreateStatementWithColumns() { argThat(is(configured(preparedStatement( "CREATE STREAM S (foo INT) WITH(VALUE_FORMAT='AVRO', KAFKA_TOPIC='orders-topic');", CreateStream.class) - )))); + ))), any(ProducerTransactionManager.class) + ); } @Test @@ -832,7 +848,7 @@ public void shouldSupportTopicInferenceInVerification() { // Then: verify(sandbox).execute(any(SandboxedServiceContext.class), eq(configuredStatement)); - verify(commandStore).enqueueCommand(argThat(configured(preparedStatementText(sql)))); + verify(commandStore).enqueueCommand(argThat(configured(preparedStatementText(sql))), any(ProducerTransactionManager.class)); } @Test @@ -856,7 +872,7 @@ public void shouldSupportTopicInferenceInExecution() { makeRequest(sql); // Then: - verify(commandStore).enqueueCommand(eq(configured)); + verify(commandStore).enqueueCommand(eq(configured), any(ProducerTransactionManager.class)); } @Test @@ -914,7 +930,7 @@ public void shouldSupportSchemaInference() { // Then: verify(sandbox).execute(any(SandboxedServiceContext.class), eq(CFG_0_WITH_SCHEMA)); - verify(commandStore).enqueueCommand(eq(CFG_1_WITH_SCHEMA)); + verify(commandStore).enqueueCommand(eq(CFG_1_WITH_SCHEMA), any(ProducerTransactionManager.class)); } @Test @@ -1119,7 +1135,7 @@ public void shouldFailMultipleStatementsAtomically() { ); // Then: - verify(commandStore, never()).enqueueCommand(any()); + verify(commandStore, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -1137,7 +1153,8 @@ public void shouldDistributeTerminateQuery() { // Then: verify(commandStore) .enqueueCommand( - argThat(is(configured(preparedStatement(terminateSql, TerminateQuery.class))))); + argThat(is(configured(preparedStatement(terminateSql, TerminateQuery.class)))), + any(ProducerTransactionManager.class)); assertThat(result.getStatementText(), is(terminateSql)); } @@ -1296,7 +1313,8 @@ public void shouldSetProperty() { argThat(is(configured( preparedStatementText(csas), ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), - ksqlConfig)))); + ksqlConfig))), + any(ProducerTransactionManager.class)); assertThat(results, hasSize(1)); assertThat(results.get(0).getStatementText(), is(csas)); @@ -1318,7 +1336,8 @@ public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { argThat(is(configured( preparedStatementText(csas), ImmutableMap.of(), - ksqlConfig)))); + ksqlConfig))), + any(ProducerTransactionManager.class)); } @Test @@ -1365,7 +1384,8 @@ public void shouldUnsetProperty() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig)))); + argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig))), + any(ProducerTransactionManager.class)); assertThat(result.getStatementText(), is(csas)); } @@ -1396,7 +1416,8 @@ public void shouldScopeSetPropertyToSingleRequest() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig)))); + argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig))), + any(ProducerTransactionManager.class)); } @Test @@ -1442,7 +1463,7 @@ public void shouldFailAllCommandsIfWouldReachActivePersistentQueriesLimit() { containsString("would cause the number of active, persistent queries " + "to exceed the configured limit")); - verify(commandStore, never()).enqueueCommand(any()); + verify(commandStore, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -1672,13 +1693,15 @@ public void shouldHandleTerminateRequestCorrectly() { preparedStatementText(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT), Collections.singletonMap( ClusterTerminateRequest.DELETE_TOPIC_LIST_PROP, ImmutableList.of("Foo")), - ksqlConfig)))); + ksqlConfig))), + any(ProducerTransactionManager.class)); } @Test public void shouldFailIfCannotWriteTerminateCommand() { // Given: - when(commandStore.enqueueCommand(any())).thenThrow(new KsqlException("")); + when(commandStore.enqueueCommand(any(), any(ProducerTransactionManager.class))) + .thenThrow(new KsqlException("")); // When: final Response response = ksqlResource.terminateCluster( @@ -1721,7 +1744,7 @@ public void shouldNeverEnqueueIfErrorIsThrown() { Code.BAD_REQUEST); // Then: - verify(commandStore, never()).enqueueCommand(any()); + verify(commandStore, never()).enqueueCommand(any(), any(ProducerTransactionManager.class)); } @Test @@ -1811,7 +1834,8 @@ public void shouldInlineRunScriptStatements() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatement(instanceOf(CreateStreamAsSelect.class)))))); + argThat(is(configured(preparedStatement(instanceOf(CreateStreamAsSelect.class))))), + any(ProducerTransactionManager.class)); } @Test @@ -1829,7 +1853,8 @@ public void shouldThrowOnRunScriptStatementMissingScriptContent() { @Test public void shouldThrowServerErrorOnFailedToDistribute() { // Given: - when(commandStore.enqueueCommand(any())).thenThrow(new KsqlException("blah")); + when(commandStore.enqueueCommand(any(), any(ProducerTransactionManager.class))) + .thenThrow(new KsqlException("blah")); final String statement = "CREATE STREAM " + streamName + " AS SELECT * FROM test_stream;"; // Expect: @@ -2048,7 +2073,8 @@ private void setUpKsqlResource() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + authorizationValidator, + producerTransactionManagerFactory ); ksqlResource.configure(ksqlConfig); diff --git a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java index cb60edf80129..10f8f451f7bd 100644 --- a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java +++ b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java @@ -470,6 +470,10 @@ private Properties buildBrokerConfig(final String logDir) { config.put(KafkaConfig.LogRetentionTimeMillisProp(), -1); // Stop logs marked for deletion from being deleted config.put(KafkaConfig.LogDeleteDelayMsProp(), Long.MAX_VALUE); + // Set to 1 because only 1 broker + config.put(KafkaConfig.TransactionsTopicReplicationFactorProp(), (short) 1); + // Set to 1 because only 1 broker + config.put(KafkaConfig.TransactionsTopicMinISRProp(), 1); return config; }