From d51287c10ebe88e3274c4a9dddf02e8f54e46a62 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 20 Aug 2020 13:15:48 -0700 Subject: [PATCH] feat: add Ksql warning to KsqlResource response when CommandRunner degraded --- .../rest/server/resources/KsqlResource.java | 22 ++++++++++++++ .../server/resources/KsqlResourceTest.java | 29 +++++++++++++++++++ .../ksql/rest/DefaultErrorMessages.java | 2 +- .../ksql/rest/entity/KsqlEntity.java | 9 ++++-- 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 2f850295a7c4..421240cd1439 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -34,8 +34,10 @@ import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; +import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.entity.KsqlWarning; import io.confluent.ksql.rest.server.ServerUtil; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.DistributingExecutor; @@ -61,12 +63,14 @@ import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.net.URL; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Supplier; import java.util.regex.PatternSyntaxException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; @@ -276,6 +280,11 @@ public EndpointResponse handleKsqlStatements( ); LOG.info("Processed successfully: " + request); + addCommandRunnerDegradedWarning( + entities, + errorHandler, + () -> commandRunner.checkCommandRunnerStatus() + == CommandRunner.CommandRunnerStatus.DEGRADED); return EndpointResponse.ok(entities); } catch (final KsqlRestException e) { LOG.info("Processed unsuccessfully: " + request + ", reason: " + e.getMessage()); @@ -315,4 +324,17 @@ private static void ensureValidPatterns(final List deleteTopicList) { } }); } + + private static void addCommandRunnerDegradedWarning( + final KsqlEntityList entityList, + final Errors errorHandler, + final Supplier commandRunnerDegraded + ) { + if (commandRunnerDegraded.get()) { + for (final KsqlEntity entity: entityList) { + entity.updateWarnings(Collections.singletonList( + new KsqlWarning(errorHandler.commandRunnerDegradedErrorMessage()))); + } + } + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index ea82266d0ac3..2bd0092e7ab7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -99,6 +99,7 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.properties.DenyListPropertyValidator; +import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -572,6 +573,34 @@ public void shouldShowQueriesExtended() { QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(1), queryHostState))); } + @Test + public void shouldHaveKsqlWarningIfCommandRunnerDegraded() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING) + .valueColumn(ColumnName.of("FIELD1"), SqlTypes.BOOLEAN) + .valueColumn(ColumnName.of("FIELD2"), SqlTypes.STRING) + .build(); + + givenSource( + DataSourceType.KSTREAM, "new_stream", "new_topic", + schema); + + // When: + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.DEGRADED); + when(errorsHandler.commandRunnerDegradedErrorMessage()).thenReturn(DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE); + + final SourceDescriptionList descriptionList1 = makeSingleRequest( + "SHOW STREAMS EXTENDED;", SourceDescriptionList.class); + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING); + final SourceDescriptionList descriptionList2 = makeSingleRequest( + "SHOW STREAMS EXTENDED;", SourceDescriptionList.class); + + assertThat(descriptionList1.getWarnings().size(), is(1)); + assertThat(descriptionList1.getWarnings().get(0).getMessage(), is(DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE)); + assertThat(descriptionList2.getWarnings().size(), is(0)); + } + @Test public void shouldDescribeStatement() { // Given: diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java index 8ca0df8a9f79..c4e4211c3dec 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java @@ -19,7 +19,7 @@ public class DefaultErrorMessages implements ErrorMessages { - static String COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE = + public static final String COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE = "The server has encountered an incompatible entry in its log " + "and cannot process further DDL statements." + System.lineSeparator() diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index c3d8b67db3c9..8d5ebf7e61c8 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -18,7 +18,8 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -60,7 +61,7 @@ public KsqlEntity(final String statementText) { public KsqlEntity(final String statementText, final List warnings) { this.statementText = statementText; - this.warnings = warnings == null ? Collections.emptyList() : ImmutableList.copyOf(warnings); + this.warnings = warnings == null ? new ArrayList<>() : new ArrayList<>(warnings); } public String getStatementText() { @@ -70,4 +71,8 @@ public String getStatementText() { public List getWarnings() { return warnings; } + + public void updateWarnings(final List warnings) { + this.warnings.addAll(warnings); + } }