Skip to content

Commit

Permalink
revert to using vanilla Kafka producer
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Nov 14, 2019
1 parent 4972ee6 commit aad8942
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.id.HybridQueryIdGenerator;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
Expand Down Expand Up @@ -107,6 +109,7 @@
import javax.websocket.server.ServerEndpointConfig.Configurator;
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
Expand Down Expand Up @@ -274,7 +277,6 @@ private void initialize() {
processingLogContext.getConfig(),
ksqlConfigNoPort
);

maybeCreateProcessingLogStream(
processingLogContext.getConfig(),
ksqlConfigNoPort,
Expand Down Expand Up @@ -635,12 +637,14 @@ private static void maybeCreateProcessingLogStream(
return;
}

final TransactionalProducer transactionalProducer =
final Producer<CommandId, Command> transactionalProducer =
commandQueue.createTransactionalProducer();
try {
transactionalProducer.initTransactions();
transactionalProducer.beginTransaction();


// We don't wait for the commandRunner in this case since it hasn't been started yet.

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.server.TransactionalProducer;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.statement.ConfiguredStatement;

import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;

/**
* Represents a queue of {@link Command}s that must be distributed to all
Expand All @@ -44,7 +43,7 @@ public interface CommandQueue extends Closeable {
*/
QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
TransactionalProducer transactionalProducer
Producer<CommandId, Command> transactionalProducer
);

/**
Expand Down Expand Up @@ -81,11 +80,18 @@ void ensureConsumedPast(long seqNum, Duration timeout)
throws InterruptedException, TimeoutException;

/**
* Creates a transactional producer for producing to the command topic
* Creates a transactional producer for producing to the command topic.
*
* @return a TransactionalProducer
*/
TransactionalProducer createTransactionalProducer();
Producer<CommandId, Command> createTransactionalProducer();

/**
* Blocks until the command topic consumer has processed all records up to
* the current offset when this method is called.
*
*/
void waitForCommandConsumer();

/**
* @return whether or not there are any enqueued commands
Expand Down
Loading

0 comments on commit aad8942

Please sign in to comment.