Skip to content

Commit

Permalink
feat: scatter gather query status from all servers in cluster for 'SH…
Browse files Browse the repository at this point in the history
…OW QUERIES [EXTENDED]' statement (#4875)

* feat: scatter gather query status from all servers in cluster for 'SHOW QUERIES [EXTENDED]' statement

* address comments

* addressing more comments

* refactor for backwards-compatability

* refactor ListQueriesExecutor

* more unit tests

* last refactor
  • Loading branch information
stevenpyzhang authored Apr 8, 2020
1 parent fd65021 commit 7385a31
Show file tree
Hide file tree
Showing 36 changed files with 1,242 additions and 152 deletions.
70 changes: 67 additions & 3 deletions docs-md/developer-guide/ksqldb-reference/show-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,80 @@ Synopsis
--------

```sql
SHOW QUERIES;
SHOW | LIST QUERIES [EXTENDED];
```

Description
-----------

List the running persistent queries.
`SHOW QUERIES` lists queries running in the cluster.

`SHOW QUERIES EXTENDED` lists queries running in the cluster in more detail.

Example
-------

TODO: example
```sql
ksql> show queries;

Query ID | Status | Sink Name | Sink Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------
CSAS_TEST_0 | RUNNING:2 | TEST | TEST | CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG EMIT CHANGES;
------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
```


```sql
ksql> show queries extended;

ID : CSAS_TEST_0
SQL : CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *
FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG
EMIT CHANGES;
Host Query Status : {192.168.1.6:8088=RUNNING, 192.168.1.6:8089=RUNNING}

Field | Type
-------------------------------------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (key)
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORDB64 VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORD VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING)>>
-------------------------------------------------------------------------------------

Sources that this query reads from:
-----------------------------------
KSQL_PROCESSING_LOG

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Sinks that this query writes to:
-----------------------------------
TEST

For sink description please run: DESCRIBE [EXTENDED] <SinkId>

Execution plan
--------------
> [ SINK ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE STRING, RECORDB64 STRING, CAUSE ARRAY<STRING>, `topic` STRING>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE STRING, RECORD STRING, CAUSE ARRAY<STRING>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE STRING>> | Logger: CSAS_TEST_0.TEST
> [ PROJECT ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE STRING, RECORDB64 STRING, CAUSE ARRAY<STRING>, `topic` STRING>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE STRING, RECORD STRING, CAUSE ARRAY<STRING>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE STRING>> | Logger: CSAS_TEST_0.Project
> [ SOURCE ] | Schema: ROWKEY STRING KEY, LOGGER STRING, LEVEL STRING, TIME BIGINT, MESSAGE STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE STRING, RECORDB64 STRING, CAUSE ARRAY<STRING>, `topic` STRING>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE STRING, RECORD STRING, CAUSE ARRAY<STRING>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE STRING>>, ROWTIME BIGINT, ROWKEY STRING | Logger: CSAS_TEST_0.KsqlTopic.Source


Processing topology
-------------------
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [default_ksql_processing_log])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: TEST)
<-- Project
```
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,10 @@ private void printQueryDescription(final QueryDescription query) {
if (query.getStatementText().length() > 0) {
writer().println(String.format("%-20s : %s", "SQL", query.getStatementText()));
}
if (query.getState().isPresent()) {
writer().println(String.format("%-20s : %s", "Status", query.getState().get()));
if (!query.getKsqlHostQueryState().isEmpty()) {
writer().println(String.format(
"%-20s : %s", "Host Query Status",
query.getKsqlHostQueryState()));
}
writer().println();
printSchema(query.getWindowType(), query.getFields(), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void shouldExplainQueryId() {
assertThat(terminal.getOutputString(), containsString(queryId));
assertThat(terminal.getOutputString(), containsString("Status"));
assertThat(terminal.getOutputString(),
either(containsString(": REBALANCING"))
either(containsString("REBALANCING"))
.or(containsString("RUNNING")));

dropStream(streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.rest.entity.FunctionDescriptionList;
import io.confluent.ksql.rest.entity.FunctionInfo;
import io.confluent.ksql.rest.entity.FunctionType;
import io.confluent.ksql.rest.entity.QueryStateCount;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlWarning;
Expand Down Expand Up @@ -80,25 +81,34 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.streams.KafkaStreams;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(Parameterized.class)
public class ConsoleTest {

private static final String CLI_CMD_NAME = "some command";
private static final String WHITE_SPACE = " \t ";
private static final String NEWLINE = System.lineSeparator();
private static final String STATE_COUNT_STRING = "RUNNING:1,ERROR:2";

private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.withRowTime()
Expand Down Expand Up @@ -130,6 +140,9 @@ public class ConsoleTest {
"statement"
);

@Mock
private QueryStateCount queryStateCount;

@Parameterized.Parameters(name = "{0}")
public static Collection<OutputFormat> data() {
return ImmutableList.of(OutputFormat.JSON, OutputFormat.TABULAR);
Expand All @@ -148,6 +161,16 @@ public ConsoleTest(final OutputFormat outputFormat) {
console.registerCliSpecificCommand(cliCommand);
}

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(queryStateCount.toString()).thenReturn(STATE_COUNT_STRING);
final EnumMap<KafkaStreams.State, Integer> mockStateCount = new EnumMap<>(KafkaStreams.State.class);
mockStateCount.put(KafkaStreams.State.RUNNING, 1);
mockStateCount.put(KafkaStreams.State.ERROR, 2);
when(queryStateCount.getStates()).thenReturn(mockStateCount);
}

@After
public void after() {
console.close();
Expand Down Expand Up @@ -289,7 +312,7 @@ public void testPrintQueries() {
final List<RunningQuery> queries = new ArrayList<>();
queries.add(
new RunningQuery(
"select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), Optional.of("Foobar")));
"select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), queryStateCount));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new Queries("e", queries)
Expand All @@ -309,16 +332,20 @@ public void testPrintQueries() {
+ " \"sinks\" : [ \"Test\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"Test topic\" ]," + NEWLINE
+ " \"id\" : \"0\"," + NEWLINE
+ " \"state\" : \"Foobar\"" + NEWLINE
+ " \"stateCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
} else {
assertThat(output, is("" + NEWLINE
+ " Query ID | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE
+ "---------------------------------------------------------------------" + NEWLINE
+ " 0 | Foobar | Test | Test topic | select * from t1 " + NEWLINE
+ "---------------------------------------------------------------------" + NEWLINE
+ " Query ID | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE
+ "--------------------------------------------------------------------------------" + NEWLINE
+ " 0 | " + STATE_COUNT_STRING + " | Test | Test topic | select * from t1 " + NEWLINE
+ "--------------------------------------------------------------------------------" + NEWLINE
+ "For detailed information on a Query run: EXPLAIN <Query ID>;" + NEWLINE));
}
}
Expand All @@ -340,10 +367,10 @@ public void testPrintSourceDescription() {
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStateCount)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStateCount)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -389,14 +416,22 @@ public void testPrintSourceDescription() {
+ " \"sinks\" : [ \"sink1\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE
+ " \"id\" : \"readId\"," + NEWLINE
+ " \"state\" : \"Running\"" + NEWLINE
+ " \"stateCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"writeQueries\" : [ {" + NEWLINE
+ " \"queryString\" : \"write query\"," + NEWLINE
+ " \"sinks\" : [ \"sink2\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE
+ " \"id\" : \"writeId\"," + NEWLINE
+ " \"state\" : \"Running\"" + NEWLINE
+ " \"stateCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"fields\" : [ {" + NEWLINE
+ " \"name\" : \"ROWTIME\"," + NEWLINE
Expand Down Expand Up @@ -1000,10 +1035,10 @@ public void testPrintExecuptionPlan() {
public void shouldPrintTopicDescribeExtended() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStateCount)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStateCount)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -1048,14 +1083,22 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"sinks\" : [ \"sink1\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE
+ " \"id\" : \"readId\"," + NEWLINE
+ " \"state\" : \"Running\"" + NEWLINE
+ " \"stateCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"writeQueries\" : [ {" + NEWLINE
+ " \"queryString\" : \"write query\"," + NEWLINE
+ " \"sinks\" : [ \"sink2\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE
+ " \"id\" : \"writeId\"," + NEWLINE
+ " \"state\" : \"Running\"" + NEWLINE
+ " \"stateCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"state\" : \"" + STATE_COUNT_STRING +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"fields\" : [ {" + NEWLINE
+ " \"name\" : \"ROWTIME\"," + NEWLINE
Expand Down Expand Up @@ -1116,13 +1159,13 @@ public void shouldPrintTopicDescribeExtended() {
+ "" + NEWLINE
+ "Queries that read from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "readId (Running) : read query" + NEWLINE
+ "readId (" + STATE_COUNT_STRING +") : read query" + NEWLINE
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>" + NEWLINE
+ "" + NEWLINE
+ "Queries that write from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "writeId (Running) : write query" + NEWLINE
+ "writeId (" + STATE_COUNT_STRING + ") : write query" + NEWLINE
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>" + NEWLINE
+ "" + NEWLINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,35 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryStateCount;
import io.confluent.ksql.rest.entity.RunningQuery;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class QueriesTableBuilderTest {

private static final String STATE = "RUNNING:1";

@Mock
private QueryStateCount queryStateCount;

@Before
public void setup() {
when(queryStateCount.toString()).thenReturn(STATE);
}
@Test
public void shouldBuildQueriesTable() {
// Given:
Expand All @@ -23,15 +40,16 @@ public void shouldBuildQueriesTable() {
ImmutableSet.of("SINK"),
ImmutableSet.of("SINK"),
new QueryId("0"),
Optional.of("RUNNING"));
queryStateCount
);

// When:
final Table table = buildTableWithSingleQuery(query);

// Then:
assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.rows(), hasSize(1));
assertThat(table.rows().get(0), contains("0", "RUNNING", "SINK", "SINK", "EXAMPLE QUERY;"));
assertThat(table.rows().get(0), contains("0", STATE, "SINK", "SINK", "EXAMPLE QUERY;"));
}

@Test
Expand All @@ -42,7 +60,8 @@ public void shouldBuildQueriesTableWithNewlines() {
ImmutableSet.of("S2"),
ImmutableSet.of("S2"),
new QueryId("CSAS_S2_0"),
Optional.of("RUNNING"));
queryStateCount
);


// When:
Expand All @@ -51,7 +70,7 @@ public void shouldBuildQueriesTableWithNewlines() {
// Then:
assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.rows(), hasSize(1));
assertThat(table.rows().get(0), contains("CSAS_S2_0", "RUNNING", "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;"));
assertThat(table.rows().get(0), contains("CSAS_S2_0", STATE, "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;"));
}

private Table buildTableWithSingleQuery(RunningQuery query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ private static ConfigDef buildConfigDef() {
}

public KsqlRequestConfig(final Map<?, ?> props) {
super(CURRENT_DEF, props);
super(CURRENT_DEF, props, false);
}
}
Loading

0 comments on commit 7385a31

Please sign in to comment.