Skip to content

Commit

Permalink
rename the transaction manager to TransactionalProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Oct 28, 2019
1 parent 0c95bb7 commit 5d0a69f
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,10 @@ private static void maybeCreateProcessingLogStream(
return;
}

final ProducerTransactionManager producerTransactionManager =
final TransactionalProducer transactionalProducer =
producerTransactionManagerFactory.createProducerTransactionManager();

producerTransactionManager.begin();
transactionalProducer.begin();

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
Expand All @@ -668,8 +668,8 @@ private static void maybeCreateProcessingLogStream(
return;
}

commandQueue.enqueueCommand(configured.get(), producerTransactionManager);
producerTransactionManager.commit();
commandQueue.enqueueCommand(configured.get(), transactionalProducer);
transactionalProducer.commit();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public ProducerTransactionManagerFactory(
);
}

public ProducerTransactionManager createProducerTransactionManager() {
return new ProducerTransactionManager(
public TransactionalProducer createProducerTransactionManager() {
return new TransactionalProducer(
commandTopicName,
commandRunner,
kafkaConsumerProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Used to handle transactional writes to the command topic
*/
public class ProducerTransactionManager {
public class TransactionalProducer {

private final TopicPartition commandTopicPartition;
private final String commandTopicName;
Expand All @@ -45,7 +45,7 @@ public class ProducerTransactionManager {
private final Producer<CommandId, Command> commandProducer;
private final CommandRunner commandRunner;

public ProducerTransactionManager(
public TransactionalProducer(
final String commandTopicName,
final CommandRunner commandRunner,
final Map<String, Object> kafkaConsumerProperties,
Expand All @@ -72,7 +72,7 @@ public ProducerTransactionManager(
}

@VisibleForTesting
ProducerTransactionManager(
TransactionalProducer(
final String commandTopicName,
final CommandRunner commandRunner,
final Consumer<CommandId, Command> commandConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.io.Closeable;
import java.time.Duration;
Expand All @@ -36,13 +36,13 @@ public interface CommandQueue extends Closeable {
* for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}.
*
* @param statement The statement to be distributed
* @param producerTransactionManager The transaction manager for enqueueing command
* @param transactionalProducer The transaction manager for enqueueing command
* @return an asynchronous tracker that can be used to determine the current
* state of the command
*/
QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
ProducerTransactionManager producerTransactionManager
TransactionalProducer transactionalProducer
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.common.collect.Maps;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.CommandTopic;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import java.io.Closeable;
Expand Down Expand Up @@ -100,7 +100,7 @@ public void close() {
@Override
public QueuedCommandStatus enqueueCommand(
final ConfiguredStatement<?> statement,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement());

Expand Down Expand Up @@ -129,7 +129,7 @@ public QueuedCommandStatus enqueueCommand(
);
try {
final RecordMetadata recordMetadata =
producerTransactionManager.send(commandId, command);
transactionalProducer.send(commandId, command);
return new QueuedCommandStatus(recordMetadata.offset(), statusFuture);
} catch (final Exception e) {
commandStatusMap.remove(commandId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.execution.StatementExecutor;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -46,7 +46,7 @@ public class DistributingExecutor implements StatementExecutor<Statement> {
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;

private ProducerTransactionManager producerTransactionManager;
private TransactionalProducer transactionalProducer;

public DistributingExecutor(
final CommandQueue commandQueue,
Expand Down Expand Up @@ -76,17 +76,17 @@ public Optional<KsqlEntity> execute(
checkAuthorization(injected, serviceContext, executionContext);

try {
if (producerTransactionManager == null) {
if (transactionalProducer == null) {
throw new RuntimeException("Transaction manager for distributing executor not set");
}

final QueuedCommandStatus queuedCommandStatus =
commandQueue.enqueueCommand(injected, producerTransactionManager);
commandQueue.enqueueCommand(injected, transactionalProducer);

final CommandStatus commandStatus = queuedCommandStatus
.tryWaitForFinalStatus(distributedCmdResponseTimeout);

producerTransactionManager = null;
transactionalProducer = null;
return Optional.of(new CommandStatusEntity(
injected.getStatementText(),
queuedCommandStatus.getCommandId(),
Expand All @@ -100,8 +100,8 @@ public Optional<KsqlEntity> execute(
}
}

public void setTransactionManager(final ProducerTransactionManager producerTransactionManager) {
this.producerTransactionManager = producerTransactionManager;
public void setTransactionManager(final TransactionalProducer transactionalProducer) {
this.transactionalProducer = transactionalProducer;
}

private void checkAuthorization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -75,7 +75,7 @@ public KsqlEntityList execute(
final ServiceContext serviceContext,
final List<ParsedStatement> statements,
final Map<String, Object> propertyOverrides,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final Map<String, Object> scopedPropertyOverrides = new HashMap<>(propertyOverrides);
final KsqlEntityList entities = new KsqlEntityList();
Expand All @@ -86,7 +86,7 @@ public KsqlEntityList execute(
serviceContext,
prepared,
propertyOverrides,
producerTransactionManager
transactionalProducer
);
if (!result.isEmpty()) {
// This is to maintain backwards compatibility until we deprecate
Expand All @@ -102,7 +102,7 @@ public KsqlEntityList execute(
configured,
scopedPropertyOverrides,
entities,
producerTransactionManager
transactionalProducer
).ifPresent(entities::add);
}
}
Expand All @@ -115,7 +115,7 @@ private <T extends Statement> Optional<KsqlEntity> executeStatement(
final ConfiguredStatement<T> configured,
final Map<String, Object> mutableScopedProperties,
final KsqlEntityList entities,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final Class<? extends Statement> statementClass = configured.getStatement().getClass();
commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass);
Expand All @@ -124,7 +124,7 @@ private <T extends Statement> Optional<KsqlEntity> executeStatement(
customExecutors.getOrDefault(statementClass, distributor);

if (executor instanceof DistributingExecutor) {
((DistributingExecutor) executor).setTransactionManager(producerTransactionManager);
((DistributingExecutor) executor).setTransactionManager(transactionalProducer);
}

return executor.execute(
Expand All @@ -139,7 +139,7 @@ private KsqlEntityList executeRunScript(
final ServiceContext serviceContext,
final PreparedStatement<?> statement,
final Map<String, Object> propertyOverrides,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final String sql = (String) propertyOverrides
.get(KsqlConstants.LEGACY_RUN_SCRIPT_STATEMENTS_CONTENT);
Expand All @@ -153,7 +153,7 @@ private KsqlEntityList executeRunScript(
serviceContext,
ksqlEngine.parse(sql),
propertyOverrides,
producerTransactionManager
transactionalProducer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.ProducerTransactionManagerFactory;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
Expand Down Expand Up @@ -222,11 +222,11 @@ public Response handleKsqlStatements(

final List<ParsedStatement> statements = ksqlEngine.parse(request.getKsql());

final ProducerTransactionManager producerTransactionManager =
final TransactionalProducer transactionalProducer =
producerTransactionManagerFactory.createProducerTransactionManager();

producerTransactionManager.begin();
producerTransactionManager.waitForCommandRunner();
transactionalProducer.begin();
transactionalProducer.waitForCommandRunner();

validator.validate(
SandboxedServiceContext.create(serviceContext),
Expand All @@ -239,10 +239,10 @@ public Response handleKsqlStatements(
serviceContext,
statements,
request.getStreamsProperties(),
producerTransactionManager
transactionalProducer
);

producerTransactionManager.commit();
transactionalProducer.commit();
return Response.ok(entities).build();
} catch (final KsqlRestException e) {
throw e;
Expand All @@ -252,7 +252,6 @@ public Response handleKsqlStatements(
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e));
} catch (final Exception e) {
e.printStackTrace();
return ErrorResponseUtil.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class KsqlRestApplicationTest {
@Mock
private ProducerTransactionManagerFactory producerTransactionManagerFactory;
@Mock
private ProducerTransactionManager producerTransactionManager;
private TransactionalProducer transactionalProducer;
private PreparedStatement<?> logCreateStatement;
private KsqlRestApplication app;

Expand All @@ -151,15 +151,15 @@ public void setUp() {
when(ksqlEngine.prepare(any())).thenReturn((PreparedStatement)preparedStatement);

when(commandQueue.isEmpty()).thenReturn(true);
when(commandQueue.enqueueCommand(any(), any(ProducerTransactionManager.class)))
when(commandQueue.enqueueCommand(any(), any(TransactionalProducer.class)))
.thenReturn(queuedCommandStatus);
when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME);
when(serviceContext.getTopicClient()).thenReturn(topicClient);
when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false);
when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty());
when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty());
when(producerTransactionManagerFactory.createProducerTransactionManager()).
thenReturn(producerTransactionManager);
thenReturn(transactionalProducer);

logCreateStatement = ProcessingLogServerUtils.processingLogStreamCreateStatement(
processingLogConfig,
Expand Down Expand Up @@ -246,7 +246,7 @@ public void shouldCreateLogStream() {
);
verify(commandQueue).enqueueCommand(
argThat(configured(equalTo(logCreateStatement), Collections.emptyMap(), ksqlConfig)),
any(ProducerTransactionManager.class));
any(TransactionalProducer.class));
}

@Test
Expand All @@ -259,7 +259,7 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() {
app.startKsql();

// Then:
verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class));
verify(commandQueue, never()).enqueueCommand(any(), any(TransactionalProducer.class));
}

@Test
Expand All @@ -271,7 +271,7 @@ public void shouldOnlyCreateLogStreamIfCommandTopicEmpty() {
app.startKsql();

// Then:
verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class));
verify(commandQueue, never()).enqueueCommand(any(), any(TransactionalProducer.class));
}

@Test
Expand All @@ -283,7 +283,7 @@ public void shouldNotCreateLogStreamIfValidationFails() {
app.startKsql();

// Then:
verify(commandQueue, never()).enqueueCommand(any(), any(ProducerTransactionManager.class));
verify(commandQueue, never()).enqueueCommand(any(), any(TransactionalProducer.class));
}

@Test
Expand All @@ -296,7 +296,7 @@ public void shouldStartCommandStoreBeforeEnqueuingLogStream() {
inOrder.verify(commandQueue).start();
inOrder.verify(commandQueue).enqueueCommand(
argThat(configured(equalTo(logCreateStatement))),
any(ProducerTransactionManager.class));
any(TransactionalProducer.class));
}

@Test
Expand All @@ -309,7 +309,7 @@ public void shouldCreateLogTopicBeforeEnqueuingLogStream() {
inOrder.verify(topicClient).createTopic(eq(LOG_TOPIC_NAME), anyInt(), anyShort());
inOrder.verify(commandQueue).enqueueCommand(
argThat(configured(equalTo(logCreateStatement))),
any(ProducerTransactionManager.class));
any(TransactionalProducer.class));
}

@Test
Expand Down Expand Up @@ -344,7 +344,7 @@ public void shouldEnqueueLogStreamBeforeSettingReady() {
final InOrder inOrder = Mockito.inOrder(commandQueue, serverState);
inOrder.verify(commandQueue).enqueueCommand(
argThat(configured(equalTo(logCreateStatement))),
any(ProducerTransactionManager.class));
any(TransactionalProducer.class));
inOrder.verify(serverState).setReady();
}

Expand Down
Loading

0 comments on commit 5d0a69f

Please sign in to comment.