Skip to content

Commit

Permalink
feat: better error message when transaction to command topic fails to…
Browse files Browse the repository at this point in the history
… initialize by timeout (#4486)

* fix: add better error message when producer fails to initiate transaction to command topic

* feat: better error message when transaction to command topic fails to initialize by timeout
  • Loading branch information
stevenpyzhang authored Feb 7, 2020
1 parent 7c4daab commit a5fed3b
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
Expand All @@ -40,6 +41,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;

/**
* A {@code StatementExecutor} that encapsulates a command queue and will
Expand All @@ -55,14 +57,16 @@ public class DistributingExecutor {
private final ValidatedCommandFactory validatedCommandFactory;
private final CommandIdAssigner commandIdAssigner;
private final ReservedInternalTopics internalTopics;
private final Errors errorHandler;

public DistributingExecutor(
final KsqlConfig ksqlConfig,
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final ValidatedCommandFactory validatedCommandFactory
final ValidatedCommandFactory validatedCommandFactory,
final Errors errorHandler
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.distributedCmdResponseTimeout =
Expand All @@ -77,6 +81,7 @@ public DistributingExecutor(
this.commandIdAssigner = new CommandIdAssigner();
this.internalTopics =
new ReservedInternalTopics(Objects.requireNonNull(ksqlConfig, "ksqlConfig"));
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
}

/**
Expand Down Expand Up @@ -109,8 +114,18 @@ public Optional<KsqlEntity> execute(

final Producer<CommandId, Command> transactionalProducer =
commandQueue.createTransactionalProducer();

try {
transactionalProducer.initTransactions();
} catch (final TimeoutException e) {
throw new KsqlServerException(errorHandler.transactionInitTimeoutErrorMessage(e), e);
} catch (final Exception e) {
throw new KsqlServerException(String.format(
"Could not write the statement '%s' into the command topic: " + e.getMessage(),
statement.getStatementText()), e);
}

try {
transactionalProducer.beginTransaction();
commandQueue.waitForCommandConsumer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void configure(final KsqlConfig config) {
distributedCmdResponseTimeout,
injectorFactory,
authorizationValidator,
new ValidatedCommandFactory(config)
new ValidatedCommandFactory(config),
errorHandler
),
ksqlEngine,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand All @@ -66,6 +67,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -128,6 +130,8 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
private Producer<CommandId, Command> transactionalProducer;
@Mock
private Command command;
@Mock
private Errors errorHandler;

private DistributingExecutor distributor;
private AtomicLong scnCounter;
Expand Down Expand Up @@ -157,7 +161,8 @@ public void setUp() throws InterruptedException {
DURATION_10_MS,
(ec, sc) -> InjectorChain.of(schemaInjector, topicInjector),
Optional.of(authorizationValidator),
validatedCommandFactory
validatedCommandFactory,
errorHandler
);
}

Expand All @@ -184,6 +189,19 @@ public void shouldEnqueueSuccessfulCommandTransactionally() {
inOrder.verify(transactionalProducer).close();
}

@Test
public void shouldNotAbortTransactionIfInitTransactionFails() {
// Given:
doThrow(TimeoutException.class).when(transactionalProducer).initTransactions();

// Expect:
expectedException.expect(KsqlServerException.class);

// Then:
distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext);
verify(transactionalProducer, times(0)).abortTransaction();
}

@Test
public void shouldInferSchemas() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ public String kafkaAuthorizationErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
}

@Override
public String transactionInitTimeoutErrorMessage(final Exception e) {
return "Timeout while initializing transaction to the KSQL command topic."
+ System.lineSeparator()
+ "If you're running a single Kafka broker, "
+ "ensure that the following Kafka configs are set to 1:"
+ System.lineSeparator()
+ "- transaction.state.log.replication.factor"
+ System.lineSeparator()
+ "- transaction.state.log.min.isr"
+ System.lineSeparator()
+ "- offsets.topic.replication.factor";
}

@Override
public String schemaRegistryUnconfiguredErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ public interface ErrorMessages {

String kafkaAuthorizationErrorMessage(Exception e);

String transactionInitTimeoutErrorMessage(Exception e);

String schemaRegistryUnconfiguredErrorMessage(Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public String kafkaAuthorizationErrorMessage(final Exception e) {
return errorMessages.kafkaAuthorizationErrorMessage(e);
}

public String transactionInitTimeoutErrorMessage(final Exception e) {
return errorMessages.transactionInitTimeoutErrorMessage(e);
}

public Response generateResponse(
final Exception e,
final Response defaultResponse
Expand Down

0 comments on commit a5fed3b

Please sign in to comment.