Skip to content

Commit

Permalink
feat: expose query ID in CommandStatusEntity (MINOR) (#5814)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Jul 15, 2020
1 parent 53ca76f commit bb29185
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ public void testPrintCommandStatus() {
new CommandStatusEntity(
"e",
CommandId.fromString("topic/1/create"),
new CommandStatus(CommandStatus.Status.SUCCESS, "Success Message"),
new CommandStatus(
CommandStatus.Status.SUCCESS,
"Success Message",
Optional.of(new QueryId("CSAS_0"))
),
0L)
));

Expand All @@ -248,7 +252,8 @@ public void testPrintCommandStatus() {
+ " \"commandId\" : \"topic/1/create\"," + NEWLINE
+ " \"commandStatus\" : {" + NEWLINE
+ " \"status\" : \"SUCCESS\"," + NEWLINE
+ " \"message\" : \"Success Message\"" + NEWLINE
+ " \"message\" : \"Success Message\"," + NEWLINE
+ " \"queryId\" : \"CSAS_0\"" + NEWLINE
+ " }," + NEWLINE
+ " \"commandSequenceNumber\" : 0," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* A query id.
*
* <p>For backwards compatibility reasons query ids must preserve their case, as their text
* representation is used, amoung other things, for internal topic naming.
* representation is used, among other things, for internal topic naming.
*
* <p>However, two ids with the same text, with different case, should compare equal. This is needed
* so that look ups against query ids are not case-sensitive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ private void executePlan(
}
}
final String successMessage = getSuccessMessage(result);
final Optional<QueryId> queryId = result.getQuery().map(QueryMetadata::getQueryId);
final CommandStatus successStatus =
new CommandStatus(CommandStatus.Status.SUCCESS, successMessage);
new CommandStatus(CommandStatus.Status.SUCCESS, successMessage, queryId);
putFinalStatus(commandId, commandStatusFuture, successStatus);
}

Expand All @@ -269,6 +270,7 @@ private void executeStatement(
final long offset
) {
String successMessage = "";
Optional<QueryId> queryId = Optional.empty();
if (statement.getStatement() instanceof ExecutableDdlStatement) {
successMessage = executeDdlStatement(statement, command);
} else if (statement.getStatement() instanceof CreateAsSelect) {
Expand All @@ -277,9 +279,11 @@ private void executeStatement(
successMessage = statement.getStatement() instanceof CreateTableAsSelect
? "Table " + name + " created and running" : "Stream " + name + " created and running";
successMessage += ". Created by query with query ID: " + query.getQueryId();
queryId = Optional.of(query.getQueryId());
} else if (statement.getStatement() instanceof InsertInto) {
final PersistentQueryMetadata query = startQuery(statement, command, mode, offset);
successMessage = "Insert Into query is running with query ID: " + query.getQueryId();
queryId = Optional.of(query.getQueryId());
} else if (statement.getStatement() instanceof TerminateQuery) {
terminateQuery((PreparedStatement<TerminateQuery>) statement);
successMessage = "Query terminated.";
Expand All @@ -291,7 +295,7 @@ private void executeStatement(
}

final CommandStatus successStatus =
new CommandStatus(CommandStatus.Status.SUCCESS, successMessage);
new CommandStatus(CommandStatus.Status.SUCCESS, successMessage, queryId);

putFinalStatus(commandId, commandStatusFuture, successStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand Down Expand Up @@ -94,7 +93,7 @@

@RunWith(MockitoJUnitRunner.class)
public class InteractiveStatementExecutorTest {
private static final String CREATE_STREAM_FOO_STATMENT = "CREATE STREAM foo ("
private static final String CREATE_STREAM_FOO_STATEMENT = "CREATE STREAM foo ("
+ "biz bigint,"
+ " baz varchar) "
+ "WITH (kafka_topic = 'foo', "
Expand All @@ -115,8 +114,6 @@ public class InteractiveStatementExecutorTest {
@Mock
private KsqlEngine mockEngine;
@Mock
private MetaStore mockMetaStore;
@Mock
private PersistentQueryMetadata mockQueryMetadata;
@Mock
private QueuedCommand queuedCommand;
Expand Down Expand Up @@ -166,7 +163,7 @@ public void setUp() {
statementExecutorWithMocks.configure(ksqlConfig);

plannedCommand = new Command(
CREATE_STREAM_FOO_STATMENT,
CREATE_STREAM_FOO_STATEMENT,
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated(),
Optional.of(plan)
Expand Down Expand Up @@ -307,7 +304,7 @@ public void shouldBuildQueriesWithPersistedConfig() {
public void shouldCompleteFutureOnSuccess() {
// Given:
final Command command = new Command(
CREATE_STREAM_FOO_STATMENT,
CREATE_STREAM_FOO_STATEMENT,
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated(),
Optional.empty()
Expand Down Expand Up @@ -375,7 +372,7 @@ public void shouldUpdateStatusOnCompletedPlannedCommand() {
new CommandStatus(Status.EXECUTING, "Executing statement"));
inOrder.verify(mockEngine).execute(any(), any(ConfiguredKsqlPlan.class));
inOrder.verify(status).setFinalStatus(
new CommandStatus(Status.SUCCESS, "Created query with ID qid"));
new CommandStatus(Status.SUCCESS, "Created query with ID qid", Optional.of(QUERY_ID)));
}

@Test
Expand Down Expand Up @@ -415,7 +412,7 @@ public void shouldExecutePlannedCommandWithMergedConfig() {
// Given:
final Map<String, String> savedConfigs = ImmutableMap.of("biz", "baz");
plannedCommand = new Command(
CREATE_STREAM_FOO_STATMENT,
CREATE_STREAM_FOO_STATEMENT,
emptyMap(),
savedConfigs,
Optional.of(plan)
Expand All @@ -442,7 +439,7 @@ public void shouldThrowExceptionIfCommandFails() {
shouldCompleteFutureOnSuccess();

final Command command = new Command(
CREATE_STREAM_FOO_STATMENT,
CREATE_STREAM_FOO_STATEMENT,
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated(),
Optional.empty()
Expand All @@ -455,7 +452,7 @@ public void shouldThrowExceptionIfCommandFails() {
} catch (final KsqlStatementException e) {
// Then:
assertEquals("Cannot add stream 'FOO': A stream with the same name already exists\n" +
"Statement: " + CREATE_STREAM_FOO_STATMENT,
"Statement: " + CREATE_STREAM_FOO_STATEMENT,
e.getMessage());
}
final InOrder inOrder = Mockito.inOrder(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.confluent.ksql.query.QueryId;
import java.util.Objects;
import java.util.Optional;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("commandStatus")
Expand All @@ -30,13 +32,20 @@ public enum Status { QUEUED, PARSING, EXECUTING, RUNNING, TERMINATED, SUCCESS, E

private final Status status;
private final String message;
private final Optional<QueryId> queryId;

public CommandStatus(final Status status, final String message) {
this(status, message, Optional.empty());
}

@JsonCreator
public CommandStatus(
@JsonProperty("status") final Status status,
@JsonProperty("message") final String message) {
@JsonProperty("message") final String message,
@JsonProperty("queryId") final Optional<QueryId> queryId) {
this.status = status;
this.message = message;
this.queryId = queryId;
}

public Status getStatus() {
Expand All @@ -47,6 +56,10 @@ public String getMessage() {
return message;
}

public Optional<QueryId> getQueryId() {
return queryId;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -57,16 +70,18 @@ public boolean equals(final Object o) {
}
final CommandStatus that = (CommandStatus) o;
return getStatus() == that.getStatus()
&& Objects.equals(getMessage(), that.getMessage());
&& Objects.equals(getMessage(), that.getMessage())
&& Objects.equals(getQueryId(), that.getQueryId());
}

@Override
public int hashCode() {
return Objects.hash(getStatus(), getMessage());
return Objects.hash(getStatus(), getMessage(), getQueryId());
}

@Override
public String toString() {
return status.name() + ": " + message;
return status.name() + ": " + message + ". "
+ "Query ID: " + (queryId.isPresent() ? queryId.toString() : "<empty>");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.ApiJsonMapper;
import java.util.Optional;
import org.junit.Test;

@SuppressFBWarnings("NP_NULL_PARAM_DEREF_NONVIRTUAL")
Expand All @@ -35,20 +37,34 @@ public class CommandStatusEntityTest {
+ "\"commandId\":\"topic/1/create\","
+ "\"commandStatus\":{"
+ "\"status\":\"SUCCESS\","
+ "\"message\":\"some success message\""
+ "\"message\":\"some success message\","
+ "\"queryId\":\"CSAS_0\""
+ "},"
+ "\"commandSequenceNumber\":2,"
+ "\"warnings\":[]"
+ "}";
private static final String JSON_ENTITY_EMPTY_QUERY_ID = "{"
+ "\"@type\":\"currentStatus\","
+ "\"statementText\":\"sql\","
+ "\"commandId\":\"topic/1/create\","
+ "\"commandStatus\":{"
+ "\"status\":\"SUCCESS\","
+ "\"message\":\"some success message\","
+ "\"queryId\":null"
+ "},"
+ "\"commandSequenceNumber\":2,"
+ "\"warnings\":[]"
+ "}";
private static final String JSON_ENTITY_NO_WARNINGS = "{"
private static final String JSON_ENTITY_NO_QUERY_ID = "{"
+ "\"@type\":\"currentStatus\","
+ "\"statementText\":\"sql\","
+ "\"commandId\":\"topic/1/create\","
+ "\"commandStatus\":{"
+ "\"status\":\"SUCCESS\","
+ "\"message\":\"some success message\""
+ "},"
+ "\"commandSequenceNumber\":2"
+ "\"commandSequenceNumber\":2,"
+ "\"warnings\":[]"
+ "}";
private static final String JSON_ENTITY_NO_CSN = "{"
+ "\"@type\":\"currentStatus\","
Expand All @@ -62,12 +78,20 @@ public class CommandStatusEntityTest {
private static final String STATEMENT_TEXT = "sql";
private static final CommandId COMMAND_ID = CommandId.fromString("topic/1/create");
private static final CommandStatus COMMAND_STATUS =
new CommandStatus(
CommandStatus.Status.SUCCESS,
"some success message",
Optional.of(new QueryId("CSAS_0"))
);
private static final CommandStatus COMMAND_STATUS_WITHOUT_QUERY_ID =
new CommandStatus(CommandStatus.Status.SUCCESS, "some success message");
private static final long COMMAND_SEQUENCE_NUMBER = 2L;
private static final CommandStatusEntity ENTITY =
new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS, COMMAND_SEQUENCE_NUMBER);
private static final CommandStatusEntity ENTITY_WITHOUT_QUERY_ID =
new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS_WITHOUT_QUERY_ID, COMMAND_SEQUENCE_NUMBER);
private static final CommandStatusEntity ENTITY_WITHOUT_SEQUENCE_NUMBER =
new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS, null);
new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS_WITHOUT_QUERY_ID, null);

@Test
public void shouldSerializeToJson() throws Exception {
Expand All @@ -78,6 +102,15 @@ public void shouldSerializeToJson() throws Exception {
assertThat(json, is(JSON_ENTITY));
}

@Test
public void shouldSerializeToJsonWithEmptyQueryId() throws Exception {
// When:
final String json = OBJECT_MAPPER.writeValueAsString(ENTITY_WITHOUT_QUERY_ID);

// Then:
assertThat(json, is(JSON_ENTITY_EMPTY_QUERY_ID));
}

@Test
public void shouldDeserializeFromJson() throws Exception {
// When:
Expand All @@ -89,7 +122,27 @@ public void shouldDeserializeFromJson() throws Exception {
}

@Test
public void shouldBeAbleToDeserializeOlderServerMessage() throws Exception {
public void shouldDeserializeFromJsonWithEmptyQueryId() throws Exception {
// When:
final CommandStatusEntity entity =
OBJECT_MAPPER.readValue(JSON_ENTITY_EMPTY_QUERY_ID, CommandStatusEntity.class);

// Then:
assertThat(entity, is(ENTITY_WITHOUT_QUERY_ID));
}

@Test
public void shouldBeAbleToDeserializeOlderServerMessageWithNoQueryId() throws Exception {
// When:
final CommandStatusEntity entity =
OBJECT_MAPPER.readValue(JSON_ENTITY_NO_QUERY_ID, CommandStatusEntity.class);

// Then:
assertThat(entity, is(ENTITY_WITHOUT_QUERY_ID));
}

@Test
public void shouldBeAbleToDeserializeOlderServerMessageWithNoCSN() throws Exception {
// When:
final CommandStatusEntity entity =
OBJECT_MAPPER.readValue(JSON_ENTITY_NO_CSN, CommandStatusEntity.class);
Expand Down

0 comments on commit bb29185

Please sign in to comment.