Skip to content

Commit

Permalink
feat: add Ksql warning to KsqlResource response when CommandRunner de…
Browse files Browse the repository at this point in the history
…graded (confluentinc#6039)
  • Loading branch information
stevenpyzhang authored and sarwarbhuiyan committed Sep 4, 2020
1 parent 8725f55 commit b4eab6c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
Expand All @@ -61,12 +63,14 @@
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.net.URL;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.regex.PatternSyntaxException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
Expand Down Expand Up @@ -276,6 +280,11 @@ public EndpointResponse handleKsqlStatements(
);

LOG.info("Processed successfully: " + request);
addCommandRunnerDegradedWarning(
entities,
errorHandler,
() -> commandRunner.checkCommandRunnerStatus()
== CommandRunner.CommandRunnerStatus.DEGRADED);
return EndpointResponse.ok(entities);
} catch (final KsqlRestException e) {
LOG.info("Processed unsuccessfully: " + request + ", reason: " + e.getMessage());
Expand Down Expand Up @@ -315,4 +324,17 @@ private static void ensureValidPatterns(final List<String> deleteTopicList) {
}
});
}

private static void addCommandRunnerDegradedWarning(
final KsqlEntityList entityList,
final Errors errorHandler,
final Supplier<Boolean> commandRunnerDegraded
) {
if (commandRunnerDegraded.get()) {
for (final KsqlEntity entity: entityList) {
entity.updateWarnings(Collections.singletonList(
new KsqlWarning(errorHandler.commandRunnerDegradedErrorMessage())));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
Expand Down Expand Up @@ -571,6 +572,34 @@ public void shouldShowQueriesExtended() {
QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(1), queryHostState)));
}

@Test
public void shouldHaveKsqlWarningIfCommandRunnerDegraded() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING)
.valueColumn(ColumnName.of("FIELD1"), SqlTypes.BOOLEAN)
.valueColumn(ColumnName.of("FIELD2"), SqlTypes.STRING)
.build();

givenSource(
DataSourceType.KSTREAM, "new_stream", "new_topic",
schema);

// When:
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.DEGRADED);
when(errorsHandler.commandRunnerDegradedErrorMessage()).thenReturn(DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE);

final SourceDescriptionList descriptionList1 = makeSingleRequest(
"SHOW STREAMS EXTENDED;", SourceDescriptionList.class);
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING);
final SourceDescriptionList descriptionList2 = makeSingleRequest(
"SHOW STREAMS EXTENDED;", SourceDescriptionList.class);

assertThat(descriptionList1.getWarnings().size(), is(1));
assertThat(descriptionList1.getWarnings().get(0).getMessage(), is(DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE));
assertThat(descriptionList2.getWarnings().size(), is(0));
}

@Test
public void shouldDescribeStatement() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class DefaultErrorMessages implements ErrorMessages {

static String COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE =
public static final String COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE =
"The server has encountered an incompatible entry in its log "
+ "and cannot process further DDL statements."
+ System.lineSeparator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -61,7 +62,7 @@ public KsqlEntity(final String statementText) {

public KsqlEntity(final String statementText, final List<KsqlWarning> warnings) {
this.statementText = statementText;
this.warnings = warnings == null ? Collections.emptyList() : ImmutableList.copyOf(warnings);
this.warnings = warnings == null ? new ArrayList<>() : new ArrayList<>(warnings);
}

public String getStatementText() {
Expand All @@ -71,4 +72,8 @@ public String getStatementText() {
public List<KsqlWarning> getWarnings() {
return warnings;
}

public void updateWarnings(final List<KsqlWarning> warnings) {
this.warnings.addAll(warnings);
}
}

0 comments on commit b4eab6c

Please sign in to comment.