From 7385a31dd33293b76e594649213da2abb81b8ddf Mon Sep 17 00:00:00 2001 From: Steven Zhang <35498506+stevenpyzhang@users.noreply.github.com> Date: Tue, 7 Apr 2020 17:09:33 -0700 Subject: [PATCH] feat: scatter gather query status from all servers in cluster for 'SHOW QUERIES [EXTENDED]' statement (#4875) * feat: scatter gather query status from all servers in cluster for 'SHOW QUERIES [EXTENDED]' statement * address comments * addressing more comments * refactor for backwards-compatability * refactor ListQueriesExecutor * more unit tests * last refactor --- .../ksqldb-reference/show-queries.md | 70 +++- .../confluent/ksql/cli/console/Console.java | 6 +- .../java/io/confluent/ksql/cli/CliTest.java | 2 +- .../ksql/cli/console/ConsoleTest.java | 75 ++++- .../builder/QueriesTableBuilderTest.java | 29 +- .../ksql/util/KsqlRequestConfig.java | 2 +- .../ksql/services/DisabledKsqlClient.java | 5 +- .../ksql/services/SimpleKsqlClient.java | 10 +- .../rest/entity/QueryDescriptionFactory.java | 14 +- .../rest/healthcheck/HealthCheckAgent.java | 3 +- .../ksql/rest/server/KsqlRestApplication.java | 8 +- .../ksql/rest/server/ServerUtil.java | 5 +- .../server/execution/ExplainExecutor.java | 27 +- .../server/execution/ListQueriesExecutor.java | 231 ++++++++++++- .../server/execution/ListSourceExecutor.java | 10 +- .../server/services/DefaultKsqlClient.java | 6 +- .../services/ServerInternalKsqlClient.java | 8 +- .../entity/QueryDescriptionFactoryTest.java | 15 +- .../healthcheck/HealthCheckAgentTest.java | 7 +- .../ShowQueriesMultiNodeFunctionalTest.java | 160 +++++++++ .../server/execution/ExplainExecutorTest.java | 36 +- .../execution/ListQueriesExecutorTest.java | 311 +++++++++++++++++- .../execution/ListSourceExecutorTest.java | 9 +- .../server/resources/KsqlResourceTest.java | 21 +- .../services/DefaultKsqlClientTest.java | 13 +- .../ServerInternalKsqlClientTest.java | 3 +- .../ksql/rest/client/KsqlRestClient.java | 5 +- .../ksql/rest/client/KsqlTarget.java | 3 +- .../ksql/rest/client/KsqlClientTest.java | 22 +- .../ksql/rest/entity/KsqlHostInfoEntity.java | 5 + .../confluent/ksql/rest/entity/Queries.java | 7 +- .../ksql/rest/entity/QueryDescription.java | 27 +- .../rest/entity/QueryDescriptionList.java | 7 +- .../ksql/rest/entity/QueryStateCount.java | 95 ++++++ .../ksql/rest/entity/RunningQuery.java | 18 +- .../ksql/rest/entity/QueryStateCountTest.java | 119 +++++++ 36 files changed, 1242 insertions(+), 152 deletions(-) create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ShowQueriesMultiNodeFunctionalTest.java create mode 100644 ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryStateCount.java create mode 100644 ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/QueryStateCountTest.java diff --git a/docs-md/developer-guide/ksqldb-reference/show-queries.md b/docs-md/developer-guide/ksqldb-reference/show-queries.md index ccc201e28b5b..b4d891f72e2b 100644 --- a/docs-md/developer-guide/ksqldb-reference/show-queries.md +++ b/docs-md/developer-guide/ksqldb-reference/show-queries.md @@ -13,16 +13,80 @@ Synopsis -------- ```sql -SHOW QUERIES; +SHOW | LIST QUERIES [EXTENDED]; ``` Description ----------- -List the running persistent queries. +`SHOW QUERIES` lists queries running in the cluster. + +`SHOW QUERIES EXTENDED` lists queries running in the cluster in more detail. Example ------- -TODO: example +```sql +ksql> show queries; + + Query ID | Status | Sink Name | Sink Kafka Topic | Query String +------------------------------------------------------------------------------------------------------------ + CSAS_TEST_0 | RUNNING:2 | TEST | TEST | CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG EMIT CHANGES; +------------------------------------------------------------------------------------------------------------ +For detailed information on a Query run: EXPLAIN ; +``` + + +```sql +ksql> show queries extended; + +ID : CSAS_TEST_0 +SQL : CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT * +FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG +EMIT CHANGES; +Host Query Status : {192.168.1.6:8088=RUNNING, 192.168.1.6:8089=RUNNING} + Field | Type +------------------------------------------------------------------------------------- + ROWTIME | BIGINT (system) + ROWKEY | VARCHAR(STRING) (key) + LOGGER | VARCHAR(STRING) + LEVEL | VARCHAR(STRING) + TIME | BIGINT + MESSAGE | STRUCT, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT>, PRODUCTIONERROR STRUCT> +------------------------------------------------------------------------------------- + +Sources that this query reads from: +----------------------------------- +KSQL_PROCESSING_LOG + +For source description please run: DESCRIBE [EXTENDED] + +Sinks that this query writes to: +----------------------------------- +TEST + +For sink description please run: DESCRIBE [EXTENDED] + +Execution plan +-------------- + > [ SINK ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT, `topic` STRING>, RECORDPROCESSINGERROR STRUCT>, PRODUCTIONERROR STRUCT> | Logger: CSAS_TEST_0.TEST + > [ PROJECT ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT, `topic` STRING>, RECORDPROCESSINGERROR STRUCT>, PRODUCTIONERROR STRUCT> | Logger: CSAS_TEST_0.Project + > [ SOURCE ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT, `topic` STRING>, RECORDPROCESSINGERROR STRUCT>, PRODUCTIONERROR STRUCT>, ROWTIME BIGINT, ROWKEY STRING | Logger: CSAS_TEST_0.KsqlTopic.Source + + +Processing topology +------------------- +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [default_ksql_processing_log]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST) + <-- Project +``` diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index da762af43403..a44e2c8e835b 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -658,8 +658,10 @@ private void printQueryDescription(final QueryDescription query) { if (query.getStatementText().length() > 0) { writer().println(String.format("%-20s : %s", "SQL", query.getStatementText())); } - if (query.getState().isPresent()) { - writer().println(String.format("%-20s : %s", "Status", query.getState().get())); + if (!query.getKsqlHostQueryState().isEmpty()) { + writer().println(String.format( + "%-20s : %s", "Host Query Status", + query.getKsqlHostQueryState())); } writer().println(); printSchema(query.getWindowType(), query.getFields(), ""); diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 2f9a13bd4055..4992f40e485e 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -938,7 +938,7 @@ public void shouldExplainQueryId() { assertThat(terminal.getOutputString(), containsString(queryId)); assertThat(terminal.getOutputString(), containsString("Status")); assertThat(terminal.getOutputString(), - either(containsString(": REBALANCING")) + either(containsString("REBALANCING")) .or(containsString("RUNNING"))); dropStream(streamName); diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 5c4f790774f6..4f82925e7fee 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -52,6 +52,7 @@ import io.confluent.ksql.rest.entity.FunctionDescriptionList; import io.confluent.ksql.rest.entity.FunctionInfo; import io.confluent.ksql.rest.entity.FunctionType; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlWarning; @@ -80,7 +81,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; @@ -88,10 +92,15 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.streams.KafkaStreams; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; @RunWith(Parameterized.class) public class ConsoleTest { @@ -99,6 +108,7 @@ public class ConsoleTest { private static final String CLI_CMD_NAME = "some command"; private static final String WHITE_SPACE = " \t "; private static final String NEWLINE = System.lineSeparator(); + private static final String STATE_COUNT_STRING = "RUNNING:1,ERROR:2"; private static final LogicalSchema SCHEMA = LogicalSchema.builder() .withRowTime() @@ -130,6 +140,9 @@ public class ConsoleTest { "statement" ); + @Mock + private QueryStateCount queryStateCount; + @Parameterized.Parameters(name = "{0}") public static Collection data() { return ImmutableList.of(OutputFormat.JSON, OutputFormat.TABULAR); @@ -148,6 +161,16 @@ public ConsoleTest(final OutputFormat outputFormat) { console.registerCliSpecificCommand(cliCommand); } + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(queryStateCount.toString()).thenReturn(STATE_COUNT_STRING); + final EnumMap mockStateCount = new EnumMap<>(KafkaStreams.State.class); + mockStateCount.put(KafkaStreams.State.RUNNING, 1); + mockStateCount.put(KafkaStreams.State.ERROR, 2); + when(queryStateCount.getStates()).thenReturn(mockStateCount); + } + @After public void after() { console.close(); @@ -289,7 +312,7 @@ public void testPrintQueries() { final List queries = new ArrayList<>(); queries.add( new RunningQuery( - "select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), Optional.of("Foobar"))); + "select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), queryStateCount)); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( new Queries("e", queries) @@ -309,16 +332,20 @@ public void testPrintQueries() { + " \"sinks\" : [ \"Test\" ]," + NEWLINE + " \"sinkKafkaTopics\" : [ \"Test topic\" ]," + NEWLINE + " \"id\" : \"0\"," + NEWLINE - + " \"state\" : \"Foobar\"" + NEWLINE + + " \"stateCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE + " } ]," + NEWLINE + " \"warnings\" : [ ]" + NEWLINE + "} ]" + NEWLINE)); } else { assertThat(output, is("" + NEWLINE - + " Query ID | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE - + "---------------------------------------------------------------------" + NEWLINE - + " 0 | Foobar | Test | Test topic | select * from t1 " + NEWLINE - + "---------------------------------------------------------------------" + NEWLINE + + " Query ID | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE + + "--------------------------------------------------------------------------------" + NEWLINE + + " 0 | " + STATE_COUNT_STRING + " | Test | Test topic | select * from t1 " + NEWLINE + + "--------------------------------------------------------------------------------" + NEWLINE + "For detailed information on a Query run: EXPLAIN ;" + NEWLINE)); } } @@ -340,10 +367,10 @@ public void testPrintSourceDescription() { ); final List readQueries = ImmutableList.of( - new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running")) + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStateCount) ); final List writeQueries = ImmutableList.of( - new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running")) + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStateCount) ); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( @@ -389,14 +416,22 @@ public void testPrintSourceDescription() { + " \"sinks\" : [ \"sink1\" ]," + NEWLINE + " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE + " \"id\" : \"readId\"," + NEWLINE - + " \"state\" : \"Running\"" + NEWLINE + + " \"stateCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE + " } ]," + NEWLINE + " \"writeQueries\" : [ {" + NEWLINE + " \"queryString\" : \"write query\"," + NEWLINE + " \"sinks\" : [ \"sink2\" ]," + NEWLINE + " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE + " \"id\" : \"writeId\"," + NEWLINE - + " \"state\" : \"Running\"" + NEWLINE + + " \"stateCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE + " } ]," + NEWLINE + " \"fields\" : [ {" + NEWLINE + " \"name\" : \"ROWTIME\"," + NEWLINE @@ -1000,10 +1035,10 @@ public void testPrintExecuptionPlan() { public void shouldPrintTopicDescribeExtended() { // Given: final List readQueries = ImmutableList.of( - new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running")) + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStateCount) ); final List writeQueries = ImmutableList.of( - new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running")) + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStateCount) ); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( @@ -1048,14 +1083,22 @@ public void shouldPrintTopicDescribeExtended() { + " \"sinks\" : [ \"sink1\" ]," + NEWLINE + " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE + " \"id\" : \"readId\"," + NEWLINE - + " \"state\" : \"Running\"" + NEWLINE + + " \"stateCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE + " } ]," + NEWLINE + " \"writeQueries\" : [ {" + NEWLINE + " \"queryString\" : \"write query\"," + NEWLINE + " \"sinks\" : [ \"sink2\" ]," + NEWLINE + " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE + " \"id\" : \"writeId\"," + NEWLINE - + " \"state\" : \"Running\"" + NEWLINE + + " \"stateCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE + " } ]," + NEWLINE + " \"fields\" : [ {" + NEWLINE + " \"name\" : \"ROWTIME\"," + NEWLINE @@ -1116,13 +1159,13 @@ public void shouldPrintTopicDescribeExtended() { + "" + NEWLINE + "Queries that read from this TABLE" + NEWLINE + "-----------------------------------" + NEWLINE - + "readId (Running) : read query" + NEWLINE + + "readId (" + STATE_COUNT_STRING +") : read query" + NEWLINE + "\n" + "For query topology and execution plan please run: EXPLAIN " + NEWLINE + "" + NEWLINE + "Queries that write from this TABLE" + NEWLINE + "-----------------------------------" + NEWLINE - + "writeId (Running) : write query" + NEWLINE + + "writeId (" + STATE_COUNT_STRING + ") : write query" + NEWLINE + "\n" + "For query topology and execution plan please run: EXPLAIN " + NEWLINE + "" + NEWLINE diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilderTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilderTest.java index 1a80eb7caa6f..724d0cc7d933 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilderTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilderTest.java @@ -3,18 +3,35 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.cli.console.table.Table; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.RunningQuery; import java.util.ArrayList; import java.util.List; -import java.util.Optional; + +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class QueriesTableBuilderTest { + + private static final String STATE = "RUNNING:1"; + + @Mock + private QueryStateCount queryStateCount; + + @Before + public void setup() { + when(queryStateCount.toString()).thenReturn(STATE); + } @Test public void shouldBuildQueriesTable() { // Given: @@ -23,7 +40,8 @@ public void shouldBuildQueriesTable() { ImmutableSet.of("SINK"), ImmutableSet.of("SINK"), new QueryId("0"), - Optional.of("RUNNING")); + queryStateCount + ); // When: final Table table = buildTableWithSingleQuery(query); @@ -31,7 +49,7 @@ public void shouldBuildQueriesTable() { // Then: assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String")); assertThat(table.rows(), hasSize(1)); - assertThat(table.rows().get(0), contains("0", "RUNNING", "SINK", "SINK", "EXAMPLE QUERY;")); + assertThat(table.rows().get(0), contains("0", STATE, "SINK", "SINK", "EXAMPLE QUERY;")); } @Test @@ -42,7 +60,8 @@ public void shouldBuildQueriesTableWithNewlines() { ImmutableSet.of("S2"), ImmutableSet.of("S2"), new QueryId("CSAS_S2_0"), - Optional.of("RUNNING")); + queryStateCount + ); // When: @@ -51,7 +70,7 @@ public void shouldBuildQueriesTableWithNewlines() { // Then: assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String")); assertThat(table.rows(), hasSize(1)); - assertThat(table.rows().get(0), contains("CSAS_S2_0", "RUNNING", "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;")); + assertThat(table.rows().get(0), contains("CSAS_S2_0", STATE, "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;")); } private Table buildTableWithSingleQuery(RunningQuery query) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java index f2fc682ea557..d80d0c1fefb6 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java @@ -57,6 +57,6 @@ private static ConfigDef buildConfigDef() { } public KsqlRequestConfig(final Map props) { - super(CURRENT_DEF, props); + super(CURRENT_DEF, props, false); } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java b/ksqldb-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java index 750ea96c2522..a14fd3799538 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java @@ -38,7 +38,10 @@ private DisabledKsqlClient() { } @Override - public RestResponse makeKsqlRequest(final URI serverEndPoint, final String sql) { + public RestResponse makeKsqlRequest( + final URI serverEndPoint, + final String sql, + final Map requestProperties) { throw new UnsupportedOperationException("KSQL client is disabled"); } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java b/ksqldb-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java index 1e3236fd9a59..d215f1663fda 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java @@ -29,9 +29,17 @@ @ThreadSafe public interface SimpleKsqlClient { + /** + * Send a request to remote Ksql server. + * @param serverEndPoint the remote destination + * @param sql the sql statement + * @param requestProperties the request metadata provided by the server + * @return the result of sql statement execution + */ RestResponse makeKsqlRequest( URI serverEndPoint, - String sql + String sql, + Map requestProperties ); /** diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java index 9b3bd38ccd43..1dbed07a08c2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java @@ -23,6 +23,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -32,7 +33,10 @@ public final class QueryDescriptionFactory { private QueryDescriptionFactory() { } - public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadata) { + public static QueryDescription forQueryMetadata( + final QueryMetadata queryMetadata, + final Map ksqlHostQueryState + ) { if (queryMetadata instanceof PersistentQueryMetadata) { final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) queryMetadata; return create( @@ -40,7 +44,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat persistentQuery, persistentQuery.getResultTopic().getKeyFormat().getWindowType(), ImmutableSet.of(persistentQuery.getSinkName()), - Optional.of(persistentQuery.getState()) + ksqlHostQueryState ); } @@ -49,7 +53,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat queryMetadata, Optional.empty(), Collections.emptySet(), - Optional.empty() + Collections.emptyMap() ); } @@ -58,7 +62,7 @@ private static QueryDescription create( final QueryMetadata queryMetadata, final Optional windowType, final Set sinks, - final Optional state + final Map ksqlHostQueryState ) { return new QueryDescription( id, @@ -70,7 +74,7 @@ private static QueryDescription create( queryMetadata.getTopologyDescription(), queryMetadata.getExecutionPlan(), queryMetadata.getOverriddenProperties(), - state + ksqlHostQueryState ); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java index 9961a6cdd6ea..ca51ce8ea009 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.healthcheck; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.rest.client.RestResponse; @@ -98,7 +99,7 @@ public String getName() { public HealthCheckResponseDetail check(final HealthCheckAgent healthCheckAgent) { final RestResponse response = healthCheckAgent.ksqlClient - .makeKsqlRequest(healthCheckAgent.serverEndpoint, ksqlStatement); + .makeKsqlRequest(healthCheckAgent.serverEndpoint, ksqlStatement, ImmutableMap.of()); return new HealthCheckResponseDetail(response.isSuccessful()); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 92c4b84aa865..a10afc444366 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -1026,8 +1026,8 @@ private static void maybeCreateProcessingLogStream( ProcessingLogServerUtils.processingLogStreamCreateStatement( processingLogConfig, ksqlConfig - ) - ); + ), + ImmutableMap.of()); if (response.isSuccessful()) { log.info("Successfully created processing log stream."); @@ -1046,8 +1046,8 @@ private static boolean processingLogStreamExists( ) { final RestResponse listStreamsResponse = internalClient.makeKsqlRequest( serverEndpoint, - "list streams;" - ); + "list streams;", + ImmutableMap.of()); final List streams = ((StreamsList) listStreamsResponse.getResponse().get(0)).getStreams(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java index 8ee2002a4eac..8b82c1eef4fd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java @@ -74,7 +74,10 @@ public static HostInfo parseHostInfo(final String applicationServerId) { * @param remotePort The remote port * @return uri */ - static URI buildRemoteUri(final URL localHost, final String remoteHost, final int remotePort) { + public static URI buildRemoteUri( + final URL localHost, + final String remoteHost, + final int remotePort) { try { return new URL(localHost.getProtocol(), remoteHost, remotePort, "/").toURI(); } catch (final Exception e) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java index aa5681a6a770..3caf3273c3b8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java @@ -24,6 +24,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; import io.confluent.ksql.rest.entity.QueryDescription; import io.confluent.ksql.rest.entity.QueryDescriptionEntity; import io.confluent.ksql.rest.entity.QueryDescriptionFactory; @@ -33,6 +34,8 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; + +import java.util.Collections; import java.util.Optional; /** @@ -53,7 +56,8 @@ public static Optional execute( .of(ExplainExecutor.explain( serviceContext, statement, - executionContext)); + executionContext, + sessionProperties)); } /** @@ -65,14 +69,16 @@ public static Optional execute( private static QueryDescriptionEntity explain( final ServiceContext serviceContext, final ConfiguredStatement statement, - final KsqlExecutionContext executionContext + final KsqlExecutionContext executionContext, + final SessionProperties sessionProperties ) { final Optional queryId = statement.getStatement().getQueryId(); try { final QueryDescription queryDescription = queryId - .map(s -> explainQuery(s, executionContext)) - .orElseGet(() -> explainStatement(statement, executionContext, serviceContext)); + .map(s -> explainQuery(s, executionContext, sessionProperties)) + .orElseGet(() -> explainStatement( + statement, executionContext, serviceContext)); return new QueryDescriptionEntity(statement.getStatementText(), queryDescription); } catch (final KsqlException e) { @@ -118,12 +124,15 @@ private static QueryDescription explainStatement( new IllegalStateException("The provided statement did not run a ksql query")); } - return QueryDescriptionFactory.forQueryMetadata(metadata); + return QueryDescriptionFactory.forQueryMetadata( + metadata, + Collections.emptyMap()); } private static QueryDescription explainQuery( final String queryId, - final KsqlExecutionContext executionContext + final KsqlExecutionContext executionContext, + final SessionProperties sessionProperties ) { final PersistentQueryMetadata metadata = executionContext .getPersistentQuery(new QueryId(queryId)) @@ -131,7 +140,11 @@ private static QueryDescription explainQuery( "Query with id:" + queryId + " does not exist, " + "use SHOW QUERIES to view the full set of queries.")); - return QueryDescriptionFactory.forQueryMetadata(metadata); + return QueryDescriptionFactory.forQueryMetadata( + metadata, + Collections.singletonMap( + new KsqlHostInfoEntity(sessionProperties.getKsqlHostInfo()), + metadata.getState())); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java index 78ec9b969f05..3234890c20bd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java @@ -15,21 +15,54 @@ package io.confluent.ksql.rest.server.execution; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.parser.tree.ListQueries; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; +import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.QueryDescription; import io.confluent.ksql.rest.entity.QueryDescriptionFactory; import io.confluent.ksql.rest.entity.QueryDescriptionList; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.rest.server.ServerUtil; +import io.confluent.ksql.rest.util.DiscoverRemoteHostsUtil; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlRequestConfig; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.state.HostInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressFBWarnings("SE_BAD_FIELD") public final class ListQueriesExecutor { + private static int TIMEOUT_SECONDS = 10; + private static final Logger LOG = LoggerFactory.getLogger(ListQueriesExecutor.class); + private ListQueriesExecutor() { } public static Optional execute( @@ -38,27 +71,193 @@ public static Optional execute( final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { - final ListQueries listQueries = statement.getStatement(); - if (listQueries.getShowExtended()) { - return Optional.of(new QueryDescriptionList( - statement.getStatementText(), - executionContext.getPersistentQueries().stream() - .map(QueryDescriptionFactory::forQueryMetadata) - .collect(Collectors.toList()))); - } + final List remoteResults = + scatterGather(statement, sessionProperties, executionContext, serviceContext); - return Optional.of(new io.confluent.ksql.rest.entity.Queries( + return statement.getStatement().getShowExtended() + ? executeExtended(remoteResults, sessionProperties, statement, executionContext) + : executeSimple(remoteResults, statement, executionContext); + } + + private static Optional executeSimple( + final List remoteResults, + final ConfiguredStatement statement, + final KsqlExecutionContext executionContext + ) { + final Map runningQueries = getLocalSimple(executionContext); + + mergeSimple(remoteResults, runningQueries); + + return Optional.of(new Queries( statement.getStatementText(), - executionContext.getPersistentQueries() - .stream() - .map(q -> new RunningQuery( + runningQueries.values())); + } + + private static Map getLocalSimple( + final KsqlExecutionContext executionContext + ) { + return executionContext + .getPersistentQueries() + .stream() + .collect(Collectors.toMap( + PersistentQueryMetadata::getQueryId, + q -> new RunningQuery( q.getStatementString(), ImmutableSet.of(q.getSinkName().text()), ImmutableSet.of(q.getResultTopic().getKafkaTopicName()), q.getQueryId(), - Optional.of(q.getState()) - )) - .collect(Collectors.toList()))); + new QueryStateCount( + Collections.singletonMap(KafkaStreams.State.valueOf(q.getState()), 1))) + )); } -} + private static void mergeSimple( + final List remoteResults, + final Map allResults + ) { + final List remoteRunningQueries = remoteResults.stream() + .map(Queries.class::cast) + .map(Queries::getQueries) + .flatMap(List::stream) + .collect(Collectors.toList()); + + for (RunningQuery q : remoteRunningQueries) { + final QueryId queryId = q.getId(); + + // If the query has already been discovered, update the QueryStateCount object + if (allResults.containsKey(queryId)) { + for (Map.Entry entry : + q.getStateCount().getStates().entrySet()) { + allResults + .get(queryId) + .getStateCount() + .updateStateCount(entry.getKey(), entry.getValue()); + } + } else { + allResults.put(queryId, q); + } + } + } + + private static Optional executeExtended( + final List remoteResults, + final SessionProperties sessionProperties, + final ConfiguredStatement statement, + final KsqlExecutionContext executionContext + ) { + final Map queryDescriptions = + getLocalExtended(sessionProperties, executionContext); + + mergeExtended(remoteResults, queryDescriptions); + + return Optional.of(new QueryDescriptionList( + statement.getStatementText(), + queryDescriptions.values())); + } + + private static Map getLocalExtended( + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext + ) { + return executionContext + .getPersistentQueries() + .stream() + .collect(Collectors.toMap( + PersistentQueryMetadata::getQueryId, + query -> QueryDescriptionFactory.forQueryMetadata( + query, + Collections.singletonMap( + new KsqlHostInfoEntity(sessionProperties.getKsqlHostInfo()), + query.getState())))); + } + + private static void mergeExtended( + final List remoteResults, + final Map allResults + ) { + final List remoteQueryDescriptions = remoteResults.stream() + .map(QueryDescriptionList.class::cast) + .map(QueryDescriptionList::getQueryDescriptions) + .flatMap(List::stream) + .collect(Collectors.toList()); + for (QueryDescription q : remoteQueryDescriptions) { + final QueryId queryId = q.getId(); + + // If the query has already been discovered, add to the ksqlQueryHostState mapping + if (allResults.containsKey(queryId)) { + for (Map.Entry entry : + q.getKsqlHostQueryState().entrySet()) { + allResults + .get(queryId) + .updateKsqlHostQueryState(entry.getKey(), entry.getValue()); + } + } else { + allResults.put(queryId, q); + } + } + } + + private static List scatterGather( + final ConfiguredStatement statement, + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + if (sessionProperties.getInternalRequest()) { + return ImmutableList.of(); + } + + final Set remoteHosts = DiscoverRemoteHostsUtil.getRemoteHosts( + executionContext.getPersistentQueries(), + sessionProperties.getKsqlHostInfo() + ); + + if (remoteHosts.isEmpty()) { + return ImmutableList.of(); + } + + final ExecutorService executorService = Executors.newFixedThreadPool(remoteHosts.size()); + + try { + final SimpleKsqlClient ksqlClient = serviceContext.getKsqlClient(); + + final Map>> futureResponses = new HashMap<>(); + for (HostInfo host : remoteHosts) { + final Future> future = executorService.submit(() -> ksqlClient + .makeKsqlRequest( + ServerUtil.buildRemoteUri( + sessionProperties.getLocalUrl(), + host.host(), + host.port() + ), + statement.getStatementText(), + Collections.singletonMap(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true)) + ); + + futureResponses.put(host, future); + } + + final List results = new ArrayList<>(); + for (final Map.Entry>> e + : futureResponses.entrySet()) { + try { + final RestResponse response = + e.getValue().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (response.isErroneous()) { + LOG.warn("Error response from host. host: {}, cause: {}", + e.getKey(), response.getErrorMessage().getMessage()); + } else { + results.add(response.getResponse().get(0)); + } + } catch (final Exception cause) { + LOG.warn("Failed to retrieve query info from host. host: {}, cause: {}", + e.getKey(), cause.getMessage()); + } + } + + return results; + } finally { + executorService.shutdown(); + } + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java index 57af877e8b4c..631171c8a848 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java @@ -28,6 +28,7 @@ import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; @@ -42,12 +43,15 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; + +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.KafkaStreams; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public final class ListSourceExecutor { @@ -224,9 +228,9 @@ private static List getQueries( ImmutableSet.of(q.getSinkName().text()), ImmutableSet.of(q.getResultTopic().getKafkaTopicName()), q.getQueryId(), - Optional.of(q.getState()) - )) - .collect(Collectors.toList()); + new QueryStateCount( + Collections.singletonMap( + KafkaStreams.State.valueOf(q.getState()), 1)))).collect(Collectors.toList()); } private static Stream sourceSteam(final KsqlStream dataSource) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java index 1d3e863a24d6..7bcb4c52582e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java @@ -69,12 +69,12 @@ final class DefaultKsqlClient implements SimpleKsqlClient { @Override public RestResponse makeKsqlRequest( final URI serverEndPoint, - final String sql - ) { + final String sql, + final Map requestProperties) { final KsqlTarget target = sharedClient .target(serverEndPoint); - return getTarget(target, authHeader).postKsqlRequest(sql, Optional.empty()); + return getTarget(target, authHeader).postKsqlRequest(sql, requestProperties, Optional.empty()); } @Override diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java index 2a471e6898d1..0e1aa42cf48c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java @@ -55,13 +55,15 @@ public ServerInternalKsqlClient( this.securityContext = requireNonNull(securityContext, "securityContext"); } + @Override public RestResponse makeKsqlRequest( final URI serverEndpoint, - final String sql - ) { + final String sql, + final Map requestProperties) { final KsqlRequest request = new KsqlRequest( - sql, Collections.emptyMap(), Collections.emptyMap(), null); + sql, Collections.emptyMap(), requestProperties, null); + final Response response = ksqlResource.handleKsqlStatements(securityContext, request); final Code statusCode = HttpStatus.getCode(response.getStatus()); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 67dade5a3851..f2aae25389f0 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -61,6 +61,8 @@ @RunWith(MockitoJUnitRunner.class) public class QueryDescriptionFactoryTest { + private static final Map STATE_MAP = + Collections.singletonMap(new KsqlHostInfoEntity("host", 8080), "RUNNING"); private static final LogicalSchema TRANSIENT_SCHEMA = LogicalSchema.builder() .valueColumn(ColumnName.of("field1"), SqlTypes.INTEGER) .valueColumn(ColumnName.of("field2"), SqlTypes.STRING) @@ -100,7 +102,6 @@ public class QueryDescriptionFactoryTest { @Before public void setUp() { when(topology.describe()).thenReturn(topologyDescription); - when(queryStreams.state()).thenReturn(State.RUNNING); when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()))); @@ -118,7 +119,7 @@ public void setUp() { queryCloseCallback, closeTimeout); - transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery); + transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery, Collections.emptyMap()); final PersistentQueryMetadata persistentQuery = new PersistentQueryMetadata( SQL_TEXT, @@ -139,7 +140,7 @@ public void setUp() { queryCloseCallback, closeTimeout); - persistentQueryDescription = QueryDescriptionFactory.forQueryMetadata(persistentQuery); + persistentQueryDescription = QueryDescriptionFactory.forQueryMetadata(persistentQuery, STATE_MAP); } @Test @@ -200,12 +201,12 @@ public void shouldExposeAllFieldsForPersistentQueries() { @Test public void shouldReportPersistentQueriesStatus() { - assertThat(persistentQueryDescription.getState(), is(Optional.of("RUNNING"))); + assertThat(persistentQueryDescription.getState(), is(Optional.of("{host:8080=RUNNING}"))); } @Test public void shouldNotReportTransientQueriesStatus() { - assertThat(transientQueryDescription.getState(), is(Optional.empty())); + assertThat(transientQueryDescription.getState(), is(Optional.of("{}"))); } @Test @@ -232,7 +233,7 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() { closeTimeout); // When: - transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery); + transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery, Collections.emptyMap()); // Then: assertThat(transientQueryDescription.getFields(), contains( @@ -265,7 +266,7 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() { closeTimeout); // When: - transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery); + transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery, Collections.emptyMap()); // Then: assertThat(transientQueryDescription.getFields(), contains( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java index 3fb0f07102c6..9fa9c7224c83 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; @@ -81,7 +82,7 @@ public class HealthCheckAgentTest { @Before public void setUp() { - when(ksqlClient.makeKsqlRequest(eq(SERVER_URI), any())).thenReturn(successfulResponse); + when(ksqlClient.makeKsqlRequest(eq(SERVER_URI), any(), eq(ImmutableMap.of()))).thenReturn(successfulResponse); when(restConfig.getList(RestConfig.LISTENERS_CONFIG)) .thenReturn(ImmutableList.of(SERVER_ADDRESS)); when(successfulResponse.isSuccessful()).thenReturn(true); @@ -106,7 +107,7 @@ public void shouldCheckHealth() { final HealthCheckResponse response = healthCheckAgent.checkHealth(); // Then: - verify(ksqlClient, atLeastOnce()).makeKsqlRequest(eq(SERVER_URI), any()); + verify(ksqlClient, atLeastOnce()).makeKsqlRequest(eq(SERVER_URI), any(), eq(ImmutableMap.of())); assertThat(response.getDetails().get(METASTORE_CHECK_NAME).getIsHealthy(), is(true)); assertThat(response.getDetails().get(KAFKA_CHECK_NAME).getIsHealthy(), is(true)); assertThat(response.getIsHealthy(), is(true)); @@ -115,7 +116,7 @@ public void shouldCheckHealth() { @Test public void shouldReturnUnhealthyIfMetastoreCheckFails() { // Given: - when(ksqlClient.makeKsqlRequest(SERVER_URI, "list streams; list tables; list queries;")) + when(ksqlClient.makeKsqlRequest(SERVER_URI, "list streams; list tables; list queries;", ImmutableMap.of())) .thenReturn(unSuccessfulResponse); // When: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ShowQueriesMultiNodeFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ShowQueriesMultiNodeFunctionalTest.java new file mode 100644 index 000000000000..e55fee1b7a09 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ShowQueriesMultiNodeFunctionalTest.java @@ -0,0 +1,160 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; + +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; +import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.QueryDescription; +import io.confluent.ksql.rest.entity.QueryDescriptionList; +import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.util.PageViewDataProvider; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import kafka.zookeeper.ZooKeeperClientException; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; + +@Category({IntegrationTest.class}) +public class ShowQueriesMultiNodeFunctionalTest { + + private static final PageViewDataProvider PAGE_VIEWS_PROVIDER = new PageViewDataProvider(); + private static final String PAGE_VIEW_TOPIC = PAGE_VIEWS_PROVIDER.topicName(); + private static final String PAGE_VIEW_STREAM = PAGE_VIEWS_PROVIDER.kstreamName(); + private static final KsqlHostInfoEntity host0 = new KsqlHostInfoEntity("localhost", 8088); + private static final KsqlHostInfoEntity host1 = new KsqlHostInfoEntity("localhost", 8089); + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8088") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8088") + .build(); + private static final TestKsqlRestApp REST_APP_1 = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8089") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8089") + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP_0) + .around(REST_APP_1); + + @BeforeClass + public static void setUpClass() { + TEST_HARNESS.ensureTopics(2, PAGE_VIEW_TOPIC); + TEST_HARNESS.produceRows(PAGE_VIEW_TOPIC, PAGE_VIEWS_PROVIDER, FormatFactory.JSON); + RestIntegrationTestUtil.createStream(REST_APP_0, PAGE_VIEWS_PROVIDER); + RestIntegrationTestUtil.makeKsqlRequest( + REST_APP_0, + "CREATE STREAM S AS SELECT * FROM " + PAGE_VIEW_STREAM + ";" + ); + } + + @Test + public void shouldShowAllQueries() { + // When: + final Supplier app0Response = () -> getShowQueriesResult(REST_APP_0); + final Supplier app1Response = () -> getShowQueriesResult(REST_APP_1); + + // Then: + assertThatEventually("App0", app0Response, is("RUNNING:2")); + assertThatEventually("App1", app1Response, is("RUNNING:2")); + } + + private static String getShowQueriesResult(final TestKsqlRestApp restApp) { + final List results = RestIntegrationTestUtil.makeKsqlRequest( + restApp, + "Show Queries;" + ); + + if (results.size() != 1) { + return "Expected 1 response, got " + results.size(); + } + + final KsqlEntity result = results.get(0); + + if (!(result instanceof Queries)) { + return "Expected Queries, got " + result; + } + + final List runningQueries = ((Queries) result) + .getQueries(); + + if (runningQueries.size() != 1) { + return "Expected 1 running query, got " + runningQueries.size(); + } + + return runningQueries.get(0).getState().orElse("N/A"); + } + + @Test + public void shouldShowAllQueriesExtended() { + // When: + final Supplier> app0Response = () -> getShowQueriesExtendedResult(REST_APP_0); + final Supplier> app1Response = () -> getShowQueriesExtendedResult(REST_APP_1); + + // Then: + assertThatEventually("App0", app0Response, containsInAnyOrder(host0, host1)); + assertThatEventually("App1", app1Response, containsInAnyOrder(host0, host1)); + } + + private static Set getShowQueriesExtendedResult(final TestKsqlRestApp restApp) { + final List results = RestIntegrationTestUtil.makeKsqlRequest( + restApp, + "Show Queries Extended;" + ); + + if (results.size() != 1) { + return Collections.emptySet(); + } + + final KsqlEntity result = results.get(0); + + if (!(result instanceof QueryDescriptionList)) { + return Collections.emptySet(); + } + + final List queryDescriptions = ((QueryDescriptionList) result) + .getQueryDescriptions(); + + if (queryDescriptions.size() != 1) { + return Collections.emptySet(); + } + + return queryDescriptions.get(0).getKsqlHostQueryState().keySet(); + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java index 55a85881ae82..4168594da36a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java @@ -29,6 +29,7 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; import io.confluent.ksql.rest.entity.QueryDescriptionEntity; import io.confluent.ksql.rest.entity.QueryDescriptionFactory; import io.confluent.ksql.rest.server.TemporaryEngine; @@ -37,21 +38,37 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlHostInfo; import io.confluent.ksql.util.PersistentQueryMetadata; + +import java.util.Collections; import java.util.Optional; + +import org.apache.kafka.streams.KafkaStreams; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ExplainExecutorTest { + private static final KafkaStreams.State STATE = KafkaStreams.State.RUNNING; + private static final KsqlHostInfo LOCAL_HOST = new KsqlHostInfo("host", 8080); @Rule public final TemporaryEngine engine = new TemporaryEngine(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @Mock + private SessionProperties sessionProperties; + + @Before + public void setup() { + when(sessionProperties.getKsqlHostInfo()).thenReturn(LOCAL_HOST); + } @Test public void shouldExplainQueryId() { @@ -65,13 +82,16 @@ public void shouldExplainQueryId() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, - mock(SessionProperties.class), + sessionProperties, engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); // Then: - assertThat(query.getQueryDescription(), equalTo(QueryDescriptionFactory.forQueryMetadata(metadata))); + assertThat( + query.getQueryDescription(), + equalTo(QueryDescriptionFactory.forQueryMetadata( + metadata, Collections.singletonMap(new KsqlHostInfoEntity(LOCAL_HOST), STATE.toString())))); } @Test @@ -84,7 +104,7 @@ public void shouldExplainPersistentStatement() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, - mock(SessionProperties.class), + sessionProperties, engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -93,6 +113,7 @@ public void shouldExplainPersistentStatement() { assertThat(query.getQueryDescription().getStatementText(), equalTo(statementText)); assertThat(query.getQueryDescription().getSources(), containsInAnyOrder("Y")); assertThat("No side effects should happen", engine.getEngine().getPersistentQueries(), is(empty())); + assertThat(query.getQueryDescription().getKsqlHostQueryState(), equalTo(Collections.emptyMap())); } @Test @@ -105,7 +126,7 @@ public void shouldExplainStatement() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, - mock(SessionProperties.class), + sessionProperties, engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -113,6 +134,7 @@ public void shouldExplainStatement() { // Then: assertThat(query.getQueryDescription().getStatementText(), equalTo(statementText)); assertThat(query.getQueryDescription().getSources(), containsInAnyOrder("Y")); + assertThat(query.getQueryDescription().getKsqlHostQueryState(), equalTo(Collections.emptyMap())); } @Test @@ -125,7 +147,7 @@ public void shouldExplainStatementWithStructFieldDereference() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, - mock(SessionProperties.class), + sessionProperties, engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -144,7 +166,7 @@ public void shouldFailOnNonQueryExplain() { // When: CustomExecutors.EXPLAIN.execute( engine.configure("Explain SHOW TOPICS;"), - mock(SessionProperties.class), + sessionProperties, engine.getEngine(), engine.getServiceContext() ); @@ -156,7 +178,7 @@ public static PersistentQueryMetadata givenPersistentQuery(final String id) { when(metadata.getQueryId()).thenReturn(new QueryId(id)); when(metadata.getSinkName()).thenReturn(SourceName.of(id)); when(metadata.getLogicalSchema()).thenReturn(TemporaryEngine.SCHEMA); - when(metadata.getState()).thenReturn("Running"); + when(metadata.getState()).thenReturn(STATE.toString()); when(metadata.getTopologyDescription()).thenReturn("topology"); when(metadata.getExecutionPlan()).thenReturn("plan"); when(metadata.getStatementString()).thenReturn("sql"); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java index 907d0f9b1ace..07e2eb54da9f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,33 +31,102 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.client.KsqlRestClientException; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.QueryDescription; import io.confluent.ksql.rest.entity.QueryDescriptionFactory; import io.confluent.ksql.rest.entity.QueryDescriptionList; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.server.TemporaryEngine; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.TreeMap; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ListQueriesExecutorTest { + private static final HostInfo REMOTE_HOST = new HostInfo("otherhost", 1234); + private static final HostInfo LOCAL_HOST = new HostInfo("HOST", 444); + private static final KsqlHostInfoEntity LOCAL_KSQL_HOST_INFO_ENTITY = + new KsqlHostInfoEntity(LOCAL_HOST.host(), LOCAL_HOST.port()); + private static final KsqlHostInfoEntity REMOTE_KSQL_HOST_INFO_ENTITY = + new KsqlHostInfoEntity(REMOTE_HOST.host(), REMOTE_HOST.port()); + + private static final KafkaStreams.State RUNNING_QUERY_STATE = KafkaStreams.State.RUNNING; + private static final KafkaStreams.State ERROR_QUERY_STATE = KafkaStreams.State.ERROR; + + private static final Map LOCAL_KSQL_HOST_INFO_MAP = + Collections.singletonMap(LOCAL_KSQL_HOST_INFO_ENTITY, RUNNING_QUERY_STATE.toString()); + @Rule public final TemporaryEngine engine = new TemporaryEngine(); + @Mock + private SessionProperties sessionProperties; + @Mock + private RestResponse response; + @Mock + private ServiceContext serviceContext; + @Mock + private SimpleKsqlClient ksqlClient; + @Mock + private KsqlEntityList ksqlEntityList; + @Mock + private Queries remoteQueries; + @Mock + private QueryDescriptionList remoteQueryDescriptionList; + + private QueryStateCount queryStateCount; + + @Before + public void setup() throws MalformedURLException { + // set to true so the tests don't perform the scatter gather by default + when(sessionProperties.getInternalRequest()).thenReturn(true); + when(sessionProperties.getKsqlHostInfo()).thenReturn(LOCAL_KSQL_HOST_INFO_ENTITY.toKsqlHost()); + when(sessionProperties.getLocalUrl()).thenReturn(new URL("https://address")); + + queryStateCount = new QueryStateCount(); + + when(ksqlClient.makeKsqlRequest(any(), any(), any())).thenReturn(response); + when(response.isErroneous()).thenReturn(false); + when(serviceContext.getKsqlClient()).thenReturn(ksqlClient); + } + @Test public void shouldListQueriesEmpty() { // When final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( engine.configure("SHOW QUERIES;"), - mock(SessionProperties.class), + mock(SessionProperties.class), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -67,35 +138,146 @@ public void shouldListQueriesEmpty() { public void shouldListQueriesBasic() { // Given final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES;"); - final PersistentQueryMetadata metadata = givenPersistentQuery("id"); + final PersistentQueryMetadata metadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); final KsqlEngine engine = mock(KsqlEngine.class); when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata)); + queryStateCount.updateStateCount(RUNNING_QUERY_STATE, 1); // When final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( showQueries, - mock(SessionProperties.class), + sessionProperties, engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); - assertThat(queries.getQueries(), containsInAnyOrder( - new RunningQuery( - metadata.getStatementString(), - ImmutableSet.of(metadata.getSinkName().text()), - ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()), - metadata.getQueryId(), - Optional.of(metadata.getState()) - ))); + assertThat(queries.getQueries(), containsInAnyOrder(queryMetaDataToRunningQuery(metadata, queryStateCount))); + } + + @Test + public void shouldNotIncludeRemoteResponseIfFutureThrowsException() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES;"); + final PersistentQueryMetadata metadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata)); + + when(ksqlClient.makeKsqlRequest(any(), any(), any())).thenThrow(new KsqlRestClientException("error")); + when(serviceContext.getKsqlClient()).thenReturn(ksqlClient); + + queryStateCount.updateStateCount(RUNNING_QUERY_STATE, 1); + + // When + final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + assertThat(queries.getQueries(), containsInAnyOrder(queryMetaDataToRunningQuery(metadata, queryStateCount))); + } + + @Test + public void shouldNotIncludeRemoteResponseIfErrorResponse() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES;"); + final PersistentQueryMetadata metadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata)); + + when(response.isErroneous()).thenReturn(true); + when(response.getErrorMessage()).thenReturn(new KsqlErrorMessage(10000, "error")); + queryStateCount.updateStateCount(RUNNING_QUERY_STATE, 1); + + // When + final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + assertThat(queries.getQueries(), containsInAnyOrder(queryMetaDataToRunningQuery(metadata, queryStateCount))); + } + + @Test + public void shouldScatterGatherAndMergeShowQueries() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES;"); + final PersistentQueryMetadata localMetadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + final PersistentQueryMetadata remoteMetadata = givenPersistentQuery("id", ERROR_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); + + final List remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery( + remoteMetadata, + new QueryStateCount(Collections.singletonMap(ERROR_QUERY_STATE, 1)))); + when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries); + when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries); + when(response.getResponse()).thenReturn(ksqlEntityList); + + queryStateCount.updateStateCount(RUNNING_QUERY_STATE, 1); + queryStateCount.updateStateCount(ERROR_QUERY_STATE, 1); + + // When + final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + System.out.println(queries.getQueries()); + assertThat(queries.getQueries(), containsInAnyOrder(queryMetaDataToRunningQuery(localMetadata, queryStateCount))); + } + + @Test + public void shouldNotMergeDifferentRunningQueries() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES;"); + final PersistentQueryMetadata localMetadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + final PersistentQueryMetadata remoteMetadata = givenPersistentQuery("different Id", RUNNING_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); + + final List remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery( + remoteMetadata, + new QueryStateCount(Collections.singletonMap(RUNNING_QUERY_STATE, 1)))); + when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries); + when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries); + when(response.getResponse()).thenReturn(ksqlEntityList); + + queryStateCount.updateStateCount(RUNNING_QUERY_STATE, 1); + + // When + final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + assertThat(queries.getQueries(), + containsInAnyOrder( + queryMetaDataToRunningQuery(localMetadata, queryStateCount), + queryMetaDataToRunningQuery(remoteMetadata, queryStateCount))); } @Test public void shouldListQueriesExtended() { // Given final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES EXTENDED;"); - final PersistentQueryMetadata metadata = givenPersistentQuery("id"); - when(metadata.getState()).thenReturn("Running"); + final PersistentQueryMetadata metadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); final KsqlEngine engine = mock(KsqlEngine.class); when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata)); @@ -103,23 +285,97 @@ public void shouldListQueriesExtended() { // When final QueryDescriptionList queries = (QueryDescriptionList) CustomExecutors.LIST_QUERIES.execute( showQueries, - mock(SessionProperties.class), + sessionProperties, engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); assertThat(queries.getQueryDescriptions(), containsInAnyOrder( - QueryDescriptionFactory.forQueryMetadata(metadata))); + QueryDescriptionFactory.forQueryMetadata(metadata, LOCAL_KSQL_HOST_INFO_MAP))); + } + + @Test + public void shouldScatterGatherAndMergeShowQueriesExtended() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES EXTENDED;"); + final PersistentQueryMetadata localMetadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + final PersistentQueryMetadata remoteMetadata = givenPersistentQuery("id", ERROR_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); + + final List remoteQueryDescriptions = Collections.singletonList( + QueryDescriptionFactory.forQueryMetadata( + remoteMetadata, + Collections.singletonMap(REMOTE_KSQL_HOST_INFO_ENTITY, ERROR_QUERY_STATE.toString())) + ); + when(remoteQueryDescriptionList.getQueryDescriptions()).thenReturn(remoteQueryDescriptions); + when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueryDescriptionList); + when(response.getResponse()).thenReturn(ksqlEntityList); + + final TreeMap mergedMap = new TreeMap<>(Comparator.comparing(KsqlHostInfoEntity::toString)); + mergedMap.put(REMOTE_KSQL_HOST_INFO_ENTITY, ERROR_QUERY_STATE.toString()); + mergedMap.put(LOCAL_KSQL_HOST_INFO_ENTITY, RUNNING_QUERY_STATE.toString()); + + // When + final QueryDescriptionList queries = (QueryDescriptionList) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + assertThat(queries.getQueryDescriptions(), + containsInAnyOrder(QueryDescriptionFactory.forQueryMetadata(localMetadata, mergedMap))); + } + + @Test + public void shouldNotMergeDifferentQueryDescriptions() { + // Given + when(sessionProperties.getInternalRequest()).thenReturn(false); + final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES EXTENDED;"); + final PersistentQueryMetadata localMetadata = givenPersistentQuery("id", RUNNING_QUERY_STATE); + final PersistentQueryMetadata remoteMetadata = givenPersistentQuery("different id", ERROR_QUERY_STATE); + + final KsqlEngine engine = mock(KsqlEngine.class); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); + + final Map remoteMap = Collections.singletonMap(LOCAL_KSQL_HOST_INFO_ENTITY, RUNNING_QUERY_STATE.toString()); + final List remoteQueryDescriptions = Collections.singletonList( + QueryDescriptionFactory.forQueryMetadata( + remoteMetadata, + remoteMap) + ); + when(remoteQueryDescriptionList.getQueryDescriptions()).thenReturn(remoteQueryDescriptions); + when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueryDescriptionList); + when(response.getResponse()).thenReturn(ksqlEntityList); + + // When + final QueryDescriptionList queries = (QueryDescriptionList) CustomExecutors.LIST_QUERIES.execute( + showQueries, + sessionProperties, + engine, + serviceContext + ).orElseThrow(IllegalStateException::new); + + assertThat(queries.getQueryDescriptions(), + containsInAnyOrder( + QueryDescriptionFactory.forQueryMetadata(localMetadata, LOCAL_KSQL_HOST_INFO_MAP), + QueryDescriptionFactory.forQueryMetadata(remoteMetadata, remoteMap))); } @SuppressWarnings("SameParameterValue") - public static PersistentQueryMetadata givenPersistentQuery(final String id) { + public static PersistentQueryMetadata givenPersistentQuery( + final String id, + final KafkaStreams.State state + ) { final PersistentQueryMetadata metadata = mock(PersistentQueryMetadata.class); when(metadata.getStatementString()).thenReturn("sql"); when(metadata.getQueryId()).thenReturn(new QueryId(id)); when(metadata.getSinkName()).thenReturn(SourceName.of(id)); when(metadata.getLogicalSchema()).thenReturn(TemporaryEngine.SCHEMA); - when(metadata.getState()).thenReturn("Running"); + when(metadata.getState()).thenReturn(state.toString()); when(metadata.getTopologyDescription()).thenReturn("topology"); when(metadata.getExecutionPlan()).thenReturn("plan"); @@ -128,6 +384,27 @@ public static PersistentQueryMetadata givenPersistentQuery(final String id) { when(sinkTopic.getKafkaTopicName()).thenReturn(id); when(metadata.getResultTopic()).thenReturn(sinkTopic); + final StreamsMetadata localStreamsMetadata = mock(StreamsMetadata.class); + when(localStreamsMetadata.hostInfo()).thenReturn(LOCAL_HOST); + final StreamsMetadata remoteStreamsMetadata = mock(StreamsMetadata.class); + when(remoteStreamsMetadata.hostInfo()).thenReturn(REMOTE_HOST); + final List streamsData = new ArrayList<>(); + streamsData.add(localStreamsMetadata); + streamsData.add(remoteStreamsMetadata); + when(metadata.getAllMetadata()).thenReturn(streamsData); + return metadata; } + + public static RunningQuery queryMetaDataToRunningQuery( + final PersistentQueryMetadata md, + final QueryStateCount queryStateCount + ) { + return new RunningQuery( + md.getStatementString(), + ImmutableSet.of(md.getSinkName().text()), + ImmutableSet.of(md.getResultTopic().getKafkaTopicName()), + md.getQueryId(), + queryStateCount); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java index 911381523312..3219a7850ece 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java @@ -41,6 +41,7 @@ import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceDescriptionFactory; @@ -57,11 +58,13 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.streams.KafkaStreams; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -250,6 +253,9 @@ public void shouldShowColumnsSource() { ).orElseThrow(IllegalStateException::new); // Then: + final QueryStateCount queryStateCount = new QueryStateCount( + Collections.singletonMap(KafkaStreams.State.valueOf(metadata.getState()), 1)); + assertThat(sourceDescription.getSourceDescription(), equalTo(SourceDescriptionFactory.create( stream, @@ -260,8 +266,7 @@ public void shouldShowColumnsSource() { ImmutableSet.of(metadata.getSinkName().toString(FormatOptions.noEscape())), ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()), metadata.getQueryId(), - Optional.of(metadata.getState()) - )), + queryStateCount)), Optional.empty()))); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index afc622e9a3a5..c7f4ebbaa54b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -97,9 +97,11 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.FunctionNameList; import io.confluent.ksql.rest.entity.FunctionType; +import io.confluent.ksql.rest.entity.QueryStateCount; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; import io.confluent.ksql.rest.entity.PropertiesList; @@ -172,6 +174,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.eclipse.jetty.http.HttpStatus.Code; import org.hamcrest.CoreMatchers; @@ -253,6 +256,10 @@ public class KsqlResourceTest { .valueColumn(ColumnName.of("f1"), SqlTypes.STRING) .build(); + private static final String APPLICATION_HOST = "localhost"; + private static final int APPLICATION_PORT = 9099; + private static final String APPLICATION_SERVER = "http://" + APPLICATION_HOST + ":" + APPLICATION_PORT; + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -535,10 +542,12 @@ public void shouldShowQueriesExtended() { final QueryDescriptionList descriptionList = makeSingleRequest( "SHOW QUERIES EXTENDED;", QueryDescriptionList.class); + final Map queryHostState = + ImmutableMap.of(new KsqlHostInfoEntity(APPLICATION_HOST, APPLICATION_PORT), "CREATED"); // Then: assertThat(descriptionList.getQueryDescriptions(), containsInAnyOrder( - QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(0)), - QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(1)))); + QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(0), queryHostState), + QueryDescriptionFactory.forQueryMetadata(queryMetadata.get(1), queryHostState))); } @Test @@ -1981,9 +1990,9 @@ private List createRunningQueries( ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())), ImmutableSet.of(md.getResultTopic().getKafkaTopicName()), md.getQueryId(), - Optional.of(md.getState()) - )) - .collect(Collectors.toList()); + new QueryStateCount( + Collections.singletonMap(KafkaStreams.State.valueOf(md.getState()), 1))) + ).collect(Collectors.toList()); } private KsqlErrorMessage makeFailingRequest(final String ksql, final Code errorCode) { @@ -2207,7 +2216,7 @@ private static Properties getDefaultKsqlConfig() { configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put("ksql.command.topic.suffix", "commands"); configMap.put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088"); - configMap.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "http://localhost:9099"); + configMap.put(StreamsConfig.APPLICATION_SERVER_CONFIG, APPLICATION_SERVER); final Properties properties = new Properties(); properties.putAll(configMap); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/DefaultKsqlClientTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/DefaultKsqlClientTest.java index 4e7b6280783c..4e038c8cde2e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/DefaultKsqlClientTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/DefaultKsqlClientTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.rest.client.KsqlClient; import io.confluent.ksql.rest.client.KsqlTarget; @@ -57,13 +58,13 @@ public void setUp() { when(sharedClient.target(any())).thenReturn(target); when(target.authorizationHeader(any())).thenReturn(target); - when(target.postKsqlRequest(any(), any())).thenReturn(response); + when(target.postKsqlRequest(any(), any(), any())).thenReturn(response); } @Test public void shouldGetRightTraget() { // When: - client.makeKsqlRequest(SERVER_ENDPOINT, "Sql"); + client.makeKsqlRequest(SERVER_ENDPOINT, "Sql", ImmutableMap.of()); // Then: verify(sharedClient).target(SERVER_ENDPOINT); @@ -72,7 +73,7 @@ public void shouldGetRightTraget() { @Test public void shouldSetAuthHeaderOnTarget() { // When: - client.makeKsqlRequest(SERVER_ENDPOINT, "Sql"); + client.makeKsqlRequest(SERVER_ENDPOINT, "Sql", ImmutableMap.of()); // Then: verify(target).authorizationHeader(AUTH_HEADER); @@ -84,7 +85,7 @@ public void shouldHandleNoAuthHeader() { client = new DefaultKsqlClient(Optional.empty(), sharedClient); // When: - final RestResponse result = client.makeKsqlRequest(SERVER_ENDPOINT, "Sql"); + final RestResponse result = client.makeKsqlRequest(SERVER_ENDPOINT, "Sql", ImmutableMap.of()); // Then: verify(target, never()).authorizationHeader(any()); @@ -94,10 +95,10 @@ public void shouldHandleNoAuthHeader() { @Test public void shouldPostRequest() { // When: - final RestResponse result = client.makeKsqlRequest(SERVER_ENDPOINT, "Sql"); + final RestResponse result = client.makeKsqlRequest(SERVER_ENDPOINT, "Sql", ImmutableMap.of()); // Then: - verify(target).postKsqlRequest("Sql", Optional.empty()); + verify(target).postKsqlRequest("Sql", ImmutableMap.of(), Optional.empty()); assertThat(result, is(response)); } } \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java index c36041d55e3d..f822780f9268 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -69,7 +70,7 @@ public void shouldMakeKsqlRequest() { // When: final RestResponse restResponse = - ksqlClient.makeKsqlRequest(unused, KSQL_STATEMENT); + ksqlClient.makeKsqlRequest(unused, KSQL_STATEMENT, ImmutableMap.of()); // Then: assertThat("is successful", restResponse.isSuccessful()); diff --git a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java index 1443d5ffd782..807213255b35 100644 --- a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java +++ b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java @@ -139,11 +139,12 @@ public CompletableFuture> makeAsyncLagReporti } public RestResponse makeKsqlRequest(final String ksql) { - return target().postKsqlRequest(ksql, Optional.empty()); + return target().postKsqlRequest(ksql, Collections.emptyMap(), Optional.empty()); } public RestResponse makeKsqlRequest(final String ksql, final Long commandSeqNum) { - return target().postKsqlRequest(ksql, Optional.ofNullable(commandSeqNum)); + return target() + .postKsqlRequest(ksql, Collections.emptyMap(), Optional.ofNullable(commandSeqNum)); } public RestResponse makeStatusRequest() { diff --git a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java index c9afef995037..25966775faf3 100644 --- a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java +++ b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java @@ -138,11 +138,12 @@ public RestResponse getStatus(final String commandId) { public RestResponse postKsqlRequest( final String ksql, + final Map requestProperties, final Optional previousCommandSeqNum ) { return post( KSQL_PATH, - createKsqlRequest(ksql, Collections.emptyMap(), previousCommandSeqNum), + createKsqlRequest(ksql, requestProperties, previousCommandSeqNum), r -> deserialize(r.getBody(), KsqlEntityList.class) ); } diff --git a/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java b/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java index ed5db3eee979..487178a2bdb7 100644 --- a/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java +++ b/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java @@ -106,7 +106,7 @@ public void shouldSendKsqlRequest() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse resp = target.postKsqlRequest(ksql, Optional.of(123L)); + RestResponse resp = target.postKsqlRequest(ksql, Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(resp.get(), is(expectedResponse)); @@ -129,7 +129,7 @@ public void shouldSendBasicAuthHeader() { // When: KsqlTarget target = ksqlClient.target(serverUri); - target.postKsqlRequest("some ksql", Optional.of(123L)); + target.postKsqlRequest("some ksql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(server.getHeaders().get("Authorization"), is(toAuthHeader(credentials))); @@ -144,7 +144,7 @@ public void shouldOverrideAuthHeader() { // When: KsqlTarget target = ksqlClient.target(serverUri).authorizationHeader("other auth"); - target.postKsqlRequest("some ksql", Optional.of(123L)); + target.postKsqlRequest("some ksql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(server.getHeaders().get("Authorization"), is("other auth")); @@ -160,7 +160,7 @@ public void shouldOverrideProperties() { // When: KsqlTarget target = ksqlClient.target(serverUri).properties(props); - target.postKsqlRequest("some ksql", Optional.of(123L)); + target.postKsqlRequest("some ksql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(getKsqlRequest().getConfigOverrides(), is(props)); @@ -465,7 +465,7 @@ public void shouldPerformRequestWithTls() throws Exception { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse resp = target.postKsqlRequest("ssl test", Optional.of(123L)); + RestResponse resp = target.postKsqlRequest("ssl test", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(getKsqlRequest().getKsql(), is("ssl test")); @@ -520,7 +520,7 @@ public void shouldFailtoMakeHttpRequestWhenServerIsHttps() throws Exception { // When: URI uri = URI.create("http://localhost:" + server.getPort()); KsqlTarget target = ksqlClient.target(uri); - target.postKsqlRequest("ssl test", Optional.of(123L)); + target.postKsqlRequest("ssl test", Collections.emptyMap(), Optional.of(123L)); } @Test @@ -534,7 +534,7 @@ public void shouldFailtoMakeHttpsRequestWhenServerIsHttp() { // When: URI uri = URI.create("https://localhost:" + server.getPort()); KsqlTarget target = ksqlClient.target(uri); - target.postKsqlRequest("ssl test", Optional.of(123L)); + target.postKsqlRequest("ssl test", Collections.emptyMap(), Optional.of(123L)); } @Test @@ -545,7 +545,7 @@ public void shouldHandleUnauthorizedOnPostRequests() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse response = target.postKsqlRequest("sql", Optional.of(123L)); + RestResponse response = target.postKsqlRequest("sql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(server.getHttpMethod(), is(HttpMethod.POST)); @@ -581,7 +581,7 @@ public void shouldHandleErrorMessageOnPostRequests() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse response = target.postKsqlRequest("sql", Optional.of(123L)); + RestResponse response = target.postKsqlRequest("sql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(response.getStatusCode().getCode(), is(400)); @@ -615,7 +615,7 @@ public void shouldHandleForbiddenOnPostRequests() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse response = target.postKsqlRequest("sql", Optional.of(123L)); + RestResponse response = target.postKsqlRequest("sql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(server.getHttpMethod(), is(HttpMethod.POST)); @@ -649,7 +649,7 @@ public void shouldHandleArbitraryErrorsOnPostRequests() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse response = target.postKsqlRequest("sql", Optional.of(123L)); + RestResponse response = target.postKsqlRequest("sql", Collections.emptyMap(), Optional.of(123L)); // Then: assertThat(server.getHttpMethod(), is(HttpMethod.POST)); diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlHostInfoEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlHostInfoEntity.java index d691a3efac0e..9698ed1c4b77 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlHostInfoEntity.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlHostInfoEntity.java @@ -56,6 +56,11 @@ public KsqlHostInfoEntity(final String serializedPair) { } } + public KsqlHostInfoEntity(final KsqlHostInfo ksqlHostInfo) { + this.host = ksqlHostInfo.host(); + this.port = ksqlHostInfo.port(); + } + public String getHost() { return host; } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/Queries.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/Queries.java index 009e65bf54ef..93a46f84ffa8 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/Queries.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/Queries.java @@ -18,7 +18,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; @@ -29,10 +32,10 @@ public class Queries extends KsqlEntity { @JsonCreator public Queries( @JsonProperty("statementText") final String statementText, - @JsonProperty("queries") final List queries + @JsonProperty("queries") final Collection queries ) { super(statementText); - this.queries = queries; + this.queries = ImmutableList.copyOf(queries); } public List getQueries() { diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java index cd6bbe599a50..7b5496e3ae58 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java @@ -23,6 +23,9 @@ import com.google.common.collect.ImmutableSet; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.query.QueryId; + +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,8 +44,9 @@ public class QueryDescription { private final String topology; private final String executionPlan; private final Map overriddenProperties; - private final Optional state; + private final Map ksqlHostQueryState; + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck @SuppressWarnings("WeakerAccess") // Invoked via reflection @JsonCreator public QueryDescription( @@ -55,7 +59,7 @@ public QueryDescription( @JsonProperty("topology") final String topology, @JsonProperty("executionPlan") final String executionPlan, @JsonProperty("overriddenProperties") final Map overriddenProperties, - @JsonProperty("state") final Optional state + @JsonProperty("ksqlHostQueryState") final Map ksqlHostQueryState ) { this.id = Objects.requireNonNull(id, "id"); this.statementText = Objects.requireNonNull(statementText, "statementText"); @@ -67,7 +71,8 @@ public QueryDescription( this.executionPlan = Objects.requireNonNull(executionPlan, "executionPlan"); this.overriddenProperties = ImmutableMap.copyOf(Objects .requireNonNull(overriddenProperties, "overriddenProperties")); - this.state = Objects.requireNonNull(state, "state"); + this.ksqlHostQueryState = + new HashMap<>(Objects.requireNonNull(ksqlHostQueryState, "ksqlHostQueryState")); } public QueryId getId() { @@ -106,8 +111,18 @@ public Map getOverriddenProperties() { return overriddenProperties; } + // kept for backwards compatibility + @JsonProperty("state") public Optional getState() { - return state; + return Optional.of(ksqlHostQueryState.toString()); + } + + public void updateKsqlHostQueryState(final KsqlHostInfoEntity host, final String state) { + ksqlHostQueryState.put(host, state); + } + + public Map getKsqlHostQueryState() { + return Collections.unmodifiableMap(ksqlHostQueryState); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -130,7 +145,7 @@ public boolean equals(final Object o) { && Objects.equals(sources, that.sources) && Objects.equals(sinks, that.sinks) && Objects.equals(overriddenProperties, that.overriddenProperties) - && Objects.equals(state, that.state); + && Objects.equals(ksqlHostQueryState, that.ksqlHostQueryState); } @Override @@ -145,7 +160,7 @@ public int hashCode() { sources, sinks, overriddenProperties, - state + ksqlHostQueryState ); } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionList.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionList.java index 4ba0c01f0b64..94f0e8b78131 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionList.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionList.java @@ -18,6 +18,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.Collection; import java.util.List; import java.util.Objects; @@ -28,10 +31,10 @@ public class QueryDescriptionList extends KsqlEntity { @JsonCreator public QueryDescriptionList( @JsonProperty("statementText") final String statementText, - @JsonProperty("queryDescriptions") final List queryDescriptions + @JsonProperty("queryDescriptions") final Collection queryDescriptions ) { super(statementText); - this.queryDescriptions = queryDescriptions; + this.queryDescriptions = ImmutableList.copyOf(queryDescriptions); } public List getQueryDescriptions() { diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryStateCount.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryStateCount.java new file mode 100644 index 000000000000..5c25de52c537 --- /dev/null +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryStateCount.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Joiner; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.streams.KafkaStreams; + +/** + * Used to keep track of a the state of KafkaStreams application + * across multiple servers. Used in {@link RunningQuery}. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class QueryStateCount { + + // Use a EnumMap so toString() will always return the same string + private final EnumMap states; + + public QueryStateCount() { + this.states = returnEnumMap(); + } + + @SuppressWarnings("unused") // Invoked by reflection + @JsonCreator + public QueryStateCount(final Map states) { + this.states = states.isEmpty() ? returnEnumMap() : new EnumMap<>(states); + } + + + public void updateStateCount(final String state, final int change) { + updateStateCount(KafkaStreams.State.valueOf(state), change); + } + + public void updateStateCount(final KafkaStreams.State state, final int change) { + this.states.compute(state, (key, existing) -> + existing == null + ? change + : existing + change); + + } + + @JsonValue + public Map getStates() { + return Collections.unmodifiableMap(states); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final QueryStateCount that = (QueryStateCount) o; + return Objects.equals(states, that.states); + } + + @Override + public int hashCode() { + return Objects.hash(states); + } + + @Override + public String toString() { + return Joiner.on(",").withKeyValueSeparator(":").join(this.states); + } + + private static EnumMap returnEnumMap() { + return new EnumMap<>(KafkaStreams.State.class); + } +} \ No newline at end of file diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java index 02a627f85727..baec888dd250 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java @@ -31,7 +31,7 @@ public class RunningQuery { private final Set sinks; private final Set sinkKafkaTopics; private final QueryId id; - private final Optional state; + private final QueryStateCount stateCount; @JsonCreator public RunningQuery( @@ -39,13 +39,13 @@ public RunningQuery( @JsonProperty("sinks") final Set sinks, @JsonProperty("sinkKafkaTopics") final Set sinkKafkaTopics, @JsonProperty("id") final QueryId id, - @JsonProperty("state") final Optional state + @JsonProperty("stateCount") final QueryStateCount stateCount ) { this.queryString = Objects.requireNonNull(queryString, "queryString"); this.sinkKafkaTopics = Objects.requireNonNull(sinkKafkaTopics, "sinkKafkaTopics"); this.sinks = Objects.requireNonNull(sinks, "sinks"); this.id = Objects.requireNonNull(id, "id"); - this.state = Objects.requireNonNull(state, "state"); + this.stateCount = Objects.requireNonNull(stateCount, "stateCount"); } public String getQueryString() { @@ -69,8 +69,14 @@ public QueryId getId() { return id; } + // kept for backwards compatibility + @JsonProperty("state") public Optional getState() { - return state; + return Optional.of(stateCount.toString()); + } + + public QueryStateCount getStateCount() { + return stateCount; } @Override @@ -86,11 +92,11 @@ public boolean equals(final Object o) { && Objects.equals(queryString, that.queryString) && Objects.equals(sinks, that.sinks) && Objects.equals(sinkKafkaTopics, that.sinkKafkaTopics) - && Objects.equals(state, that.state); + && Objects.equals(stateCount, that.stateCount); } @Override public int hashCode() { - return Objects.hash(id, queryString, sinks, sinkKafkaTopics, state); + return Objects.hash(id, queryString, sinks, sinkKafkaTopics, stateCount); } } diff --git a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/QueryStateCountTest.java b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/QueryStateCountTest.java new file mode 100644 index 000000000000..34f6a80de32f --- /dev/null +++ b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/QueryStateCountTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import org.apache.kafka.streams.KafkaStreams; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +@SuppressWarnings("SameParameterValue") +public class QueryStateCountTest { + + QueryStateCount queryStateCount; + + @Before + public void setup() { + queryStateCount = new QueryStateCount(); + } + + @Test + public void shouldUpdateExistingStateCount() { + queryStateCount.updateStateCount(KafkaStreams.State.RUNNING, 2); + assertThat( + queryStateCount.getStates().get(KafkaStreams.State.RUNNING), + is(2)); + queryStateCount.updateStateCount(KafkaStreams.State.RUNNING, 4); + assertThat( + queryStateCount.getStates().get(KafkaStreams.State.RUNNING), + is(6)); + } + + @Test + public void shouldToString() { + queryStateCount.updateStateCount(KafkaStreams.State.RUNNING, 2); + assertThat( + queryStateCount.toString(), + is("RUNNING:2")); + queryStateCount.updateStateCount(KafkaStreams.State.NOT_RUNNING, 1); + assertThat( + queryStateCount.toString(), + is("RUNNING:2,NOT_RUNNING:1")); + } + + @Test + public void shouldImplementHashCodeAndEqualsCorrectly() { + final QueryStateCount queryStateCount1 = new QueryStateCount(Collections.singletonMap(KafkaStreams.State.ERROR, 2)); + final QueryStateCount queryStateCount2 = new QueryStateCount(Collections.singletonMap(KafkaStreams.State.RUNNING, 1)); + queryStateCount2.updateStateCount(KafkaStreams.State.ERROR, 2); + final QueryStateCount queryStateCount3 = new QueryStateCount(); + queryStateCount3.updateStateCount(KafkaStreams.State.ERROR, 2); + + new EqualsTester() + .addEqualityGroup(queryStateCount, queryStateCount) + .addEqualityGroup(queryStateCount1, queryStateCount3) + .addEqualityGroup(queryStateCount2) + .testEquals(); + } + + @Test + public void shouldRoundTripWhenEmpty() { + // When: + final String json = assertDeserializedToSame(queryStateCount); + + // Then: + assertThat(json, is("{}")); + } + + @Test + public void shouldRoundTripWhenNotEmpty() { + // Given: + queryStateCount.updateStateCount(KafkaStreams.State.RUNNING, 2); + queryStateCount.updateStateCount(KafkaStreams.State.CREATED, 10); + + // When: + final String json = assertDeserializedToSame(queryStateCount); + + // Then: + assertThat(json, is("{" + + "\"CREATED\":10," + + "\"RUNNING\":2" + + "}")); + } + + private static String assertDeserializedToSame(final QueryStateCount original) { + try { + final ObjectMapper mapper = new ObjectMapper(); + final String json = mapper.writeValueAsString(original); + + final QueryStateCount deserialized = mapper + .readValue(json, QueryStateCount.class); + + assertThat(deserialized, is(original)); + + return json; + } catch (Exception e) { + throw new AssertionError("Failed to round trip", e); + } + } +} \ No newline at end of file