Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Oct 24, 2019
1 parent 36c5891 commit ba7c7a2
Show file tree
Hide file tree
Showing 19 changed files with 664 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,36 +40,28 @@ public class CommandTopic {
private final TopicPartition commandTopicPartition;

private Consumer<CommandId, Command> commandConsumer = null;
private Producer<CommandId, Command> commandProducer = null;
private final String commandTopicName;

public CommandTopic(
final String commandTopicName,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
final Map<String, Object> kafkaConsumerProperties
) {
this(
commandTopicName,
new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true),
InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false)
),
new KafkaProducer<>(
Objects.requireNonNull(kafkaProducerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonSerializer(true),
InternalTopicJsonSerdeUtil.getJsonSerializer(false)
));
)
);
}

CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Producer<CommandId, Command> commandProducer
final Consumer<CommandId, Command> commandConsumer
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
}

Expand All @@ -86,24 +73,6 @@ public void start() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

public RecordMetadata send(final CommandId commandId, final Command command) {
final ProducerRecord<CommandId, Command> producerRecord = new ProducerRecord<>(
commandTopicName,
0,
Objects.requireNonNull(commandId, "commandId"),
Objects.requireNonNull(command, "command"));
try {
return commandProducer.send(producerRecord).get();
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException(e.getCause());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
}
Expand Down Expand Up @@ -150,6 +119,5 @@ public void wakeup() {

public void close() {
commandConsumer.close();
commandProducer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple
private final List<KsqlServerPrecondition> preconditions;
private final List<KsqlConfigurable> configurables;
private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;
private final ProducerTransactionManagerFactory producerTransactionManagerFactory;

public static SourceName getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
Expand All @@ -169,7 +170,8 @@ public static SourceName getCommandsStreamName() {
final ProcessingLogContext processingLogContext,
final List<KsqlServerPrecondition> preconditions,
final List<KsqlConfigurable> configurables,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final ProducerTransactionManagerFactory producerTransactionManagerFactory
) {
super(config);

Expand All @@ -192,6 +194,8 @@ public static SourceName getCommandsStreamName() {
this.configurables = requireNonNull(configurables, "configurables");
this.rocksDBConfigSetterHandler =
requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
this.producerTransactionManagerFactory =
requireNonNull(producerTransactionManagerFactory, "producerTransactionManagerFactory");
}

@Override
Expand Down Expand Up @@ -278,7 +282,8 @@ private void initialize() {
processingLogContext.getConfig(),
ksqlConfigNoPort,
ksqlEngine,
commandStore
commandStore,
producerTransactionManagerFactory
);

commandRunner.processPriorCommands();
Expand Down Expand Up @@ -470,13 +475,12 @@ static KsqlRestApplication buildApplication(

UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

final String commandTopic = KsqlInternalTopicUtils.getTopicName(
final String commandTopicName = KsqlInternalTopicUtils.getTopicName(
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX);

final CommandStore commandStore = CommandStore.Factory.create(
commandTopic,
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties());
commandTopicName,
restConfig.getCommandConsumerProperties());

final StatementExecutor statementExecutor =
new StatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator);
Expand Down Expand Up @@ -504,19 +508,12 @@ static KsqlRestApplication buildApplication(
authorizationValidator
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopic);
managedTopics.add(commandTopicName);
if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) {
managedTopics.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig));
}

final CommandRunner commandRunner = new CommandRunner(
statementExecutor,
commandStore,
Expand All @@ -525,6 +522,24 @@ static KsqlRestApplication buildApplication(
serverState
);

final ProducerTransactionManagerFactory producerTransactionManagerFactory =
new ProducerTransactionManagerFactory(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
commandRunner,
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties()
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
producerTransactionManagerFactory
);

final List<KsqlServerPrecondition> preconditions = restConfig.getConfiguredInstances(
KsqlRestConfig.KSQL_SERVER_PRECONDITIONS,
KsqlServerPrecondition.class
Expand Down Expand Up @@ -557,7 +572,8 @@ static KsqlRestApplication buildApplication(
processingLogContext,
preconditions,
configurables,
rocksDBConfigSetterHandler
rocksDBConfigSetterHandler,
producerTransactionManagerFactory
);
}

Expand Down Expand Up @@ -624,13 +640,19 @@ private static void maybeCreateProcessingLogStream(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue
final CommandQueue commandQueue,
final ProducerTransactionManagerFactory producerTransactionManagerFactory
) {
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)
|| !commandQueue.isEmpty()) {
return;
}

final ProducerTransactionManager producerTransactionManager =
producerTransactionManagerFactory.createProducerTransactionManager();

producerTransactionManager.begin();

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
Expand All @@ -646,7 +668,8 @@ private static void maybeCreateProcessingLogStream(
return;
}

commandQueue.enqueueCommand(configured.get());
commandQueue.enqueueCommand(configured.get(), producerTransactionManager);
producerTransactionManager.commit();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.util.InternalTopicJsonSerdeUtil;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;

/**
* Used to handle transactional writes to the command topic
*/
public class ProducerTransactionManager {

private final TopicPartition commandTopicPartition;
private final String commandTopicName;

private final Consumer<CommandId, Command> commandConsumer;
private final Producer<CommandId, Command> commandProducer;
private final CommandRunner commandRunner;

public ProducerTransactionManager(
final String commandTopicName,
final CommandRunner commandRunner,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
) {
this.commandTopicPartition = new TopicPartition(
Objects.requireNonNull(commandTopicName, "commandTopicName"),
0
);

this.commandConsumer = new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaConsumerProperties"),
InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true),
InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false)
);

this.commandProducer = new KafkaProducer<>(
Objects.requireNonNull(kafkaProducerProperties, "kafkaProducerProperties"),
InternalTopicJsonSerdeUtil.getJsonSerializer(true),
InternalTopicJsonSerdeUtil.getJsonSerializer(false)
);
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
}

@VisibleForTesting
ProducerTransactionManager(
final String commandTopicName,
final CommandRunner commandRunner,
final Consumer<CommandId, Command> commandConsumer,
final Producer<CommandId, Command> commandProducer
) {
this.commandTopicPartition = new TopicPartition(
Objects.requireNonNull(commandTopicName, "commandTopicName"),
0
);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
}


/** begins transaction */
public void begin() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
commandProducer.initTransactions();
commandProducer.beginTransaction();
}

public void waitForCommandRunner() {
final long endOffset = getEndOffset();

try {
int retries = 0;
while (commandRunner.getNumCommandProcessed() < endOffset) {
Thread.sleep(1000);

if (retries == 60) {
throw new RuntimeException("commandRunner has not processed all commands in topic");
}
retries++;
}
} catch (Exception exception) {
throw new RuntimeException(
"Error while waiting for commandRunner to process command topic:",
exception
);
}
}

private long getEndOffset() {
return commandConsumer.endOffsets(Collections.singletonList(commandTopicPartition))
.get(commandTopicPartition);
}

public RecordMetadata send(final CommandId commandId, final Command command) {
final ProducerRecord<CommandId, Command> producerRecord = new ProducerRecord<>(
commandTopicName,
0,
Objects.requireNonNull(commandId, "commandId"),
Objects.requireNonNull(command, "command"));
try {
return commandProducer.send(producerRecord).get();
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
} catch (KafkaException e) {
commandProducer.abortTransaction();
throw new KafkaException(e);
}
}

public void abort() {
commandProducer.abortTransaction();
close();

}

public void commit() {
commandProducer.commitTransaction();
close();
}

public void close() {
commandConsumer.close();
commandProducer.close();
}
}
Loading

0 comments on commit ba7c7a2

Please sign in to comment.