From 646eb6e51442b966f28382171e783ee7b65abc52 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 22 Apr 2021 14:00:52 -0500 Subject: [PATCH 1/4] long-running queries shouldn't block the main event loop --- .../integration/ClientIntegrationTest.java | 73 ++++-- .../RestClientIntegrationTest.java | 219 ++++++++++++++++++ .../ksql/api/server/OldApiUtils.java | 46 ++-- 3 files changed, 297 insertions(+), 41 deletions(-) create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 7bb2505b4783..33704ab8917c 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -21,10 +21,13 @@ import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -69,8 +72,11 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.client.StreamPublisher; import io.confluent.ksql.rest.entity.ConnectorList; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.ConnectExecutable; import io.confluent.ksql.rest.server.TestKsqlRestApp; @@ -203,10 +209,18 @@ public class ClientIntegrationTest { private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + // these properties are set together to allow us to verify that we can handle push queries + // in the worker pool without blocking the event loop. + private static final int EVENT_LOOP_POOL_SIZE = 1; + private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5; + private static final int WORKER_POOL_SIZE = 10; + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) .withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") + .withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE) + .withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE) .build(); @ClassRule @@ -296,6 +310,30 @@ public void tearDown() { REST_APP.getServiceContext().close(); } + @Test + public void shouldStreamMultiplePushQueries() throws Exception { + // When + final StreamedQueryResult[] streamedQueryResults = new StreamedQueryResult[NUM_CONCURRENT_REQUESTS_TO_TEST]; + for (int i = 0; i < streamedQueryResults.length; i++) { + streamedQueryResults[i] = client.streamQuery(PUSH_QUERY).get(); + } + + // Then + for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { + assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES)); + assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES)); + assertThat(streamedQueryResult.queryID(), is(notNullValue())); + } + + for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { + shouldReceiveStreamRows(streamedQueryResult, false); + } + + for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { + assertThat(streamedQueryResult.isComplete(), is(false)); + } + } + @Test public void shouldStreamPushQueryAsync() throws Exception { // When @@ -958,29 +996,24 @@ public void shouldListTopics() throws Exception { } @Test - public void shouldListQueries() { + public void shouldListQueries() throws ExecutionException, InterruptedException { // When - // Try multiple times to allow time for queries started by the other tests to finish terminating - final List queries = assertThatEventually(() -> { - try { - return client.listQueries().get(); - } catch (Exception e) { - return Collections.emptyList(); - } - }, hasSize(1)); + final List queries = client.listQueries().get(); // Then - assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT)); - assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5")); - assertThat(queries.get(0).getSql(), is( - "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" - + " " + TEST_STREAM + ".K K,\n" - + " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" - + "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" - + "GROUP BY " + TEST_STREAM + ".K\n" - + "EMIT CHANGES;")); - assertThat(queries.get(0).getSink(), is(Optional.of(AGG_TABLE))); - assertThat(queries.get(0).getSinkTopic(), is(Optional.of(AGG_TABLE))); + assertThat(queries, hasItem(allOf( + hasProperty("queryType", is(QueryType.PERSISTENT)), + hasProperty("id", is("CTAS_" + AGG_TABLE + "_5")), + hasProperty("sql", is( + "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + + " " + TEST_STREAM + ".K K,\n" + + " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" + + "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" + + "GROUP BY " + TEST_STREAM + ".K\n" + + "EMIT CHANGES;")), + hasProperty("sink", is(Optional.of(AGG_TABLE))), + hasProperty("sinkTopic", is(Optional.of(AGG_TABLE))) + ))); } @Test diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java new file mode 100644 index 000000000000..3822e7326868 --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java @@ -0,0 +1,219 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.confluent.ksql.api.client.integration; + +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.client.StreamPublisher; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.StreamedRow; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.SerdeFeature; +import io.confluent.ksql.serde.SerdeFeatures; +import io.confluent.ksql.util.StructuredTypesDataProvider; +import io.confluent.ksql.util.TestDataProvider; +import io.vertx.core.Vertx; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; + +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + * This integration test is displaced from the rest-client package + * to make use of utilities that are not available there. + */ +@Category(IntegrationTest.class) +public class RestClientIntegrationTest { + + private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider(); + private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName(); + private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); + + private static final Format KEY_FORMAT = FormatFactory.JSON; + private static final Format VALUE_FORMAT = FormatFactory.JSON; + + private static final String AGG_TABLE = "AGG_TABLE"; + private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from( + LogicalSchema.builder() + .keyColumn(ColumnName.of("K"), SqlTypes.struct() + .field("F1", SqlTypes.array(SqlTypes.STRING)) + .build()) + .valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT) + .build(), + SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), + SerdeFeatures.of() + ); + + private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider( + "EMPTY_STRUCTURED_TYPES", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); + private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName(); + + private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider( + "EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); + private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName(); + + private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;"; + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + // these properties are set together to allow us to verify that we can handle push queries + // in the worker pool without blocking the event loop. + private static final int EVENT_LOOP_POOL_SIZE = 1; + private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5; + private static final int WORKER_POOL_SIZE = 10; + + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") + .withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE) + .withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE) + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP); + + @BeforeClass + public static void setUpClass() throws Exception { + TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); + TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT); + RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2); + + makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + + "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;" + ); + + TEST_HARNESS.verifyAvailableUniqueRows( + AGG_TABLE, + 4, // Only unique keys are counted + KEY_FORMAT, + VALUE_FORMAT, + AGG_SCHEMA + ); + + final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString(); + final String connectFilePath = Paths.get(testDir, "connect.properties").toString(); + Files.createDirectories(Paths.get(testDir)); + + writeConnectConfigs(connectFilePath, ImmutableMap.builder() + .put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()) + .put("group.id", UUID.randomUUID().toString()) + .put("key.converter", StringConverter.class.getName()) + .put("value.converter", JsonConverter.class.getName()) + .put("offset.storage.topic", "connect-offsets") + .put("status.storage.topic", "connect-status") + .put("config.storage.topic", "connect-config") + .put("offset.storage.replication.factor", "1") + .put("status.storage.replication.factor", "1") + .put("config.storage.replication.factor", "1") + .put("value.converter.schemas.enable", "false") + .build() + ); + + } + + private static void writeConnectConfigs(final String path, final Map configs) throws Exception { + try (PrintWriter out = new PrintWriter(new OutputStreamWriter( + new FileOutputStream(path, true), StandardCharsets.UTF_8))) { + for (Map.Entry entry : configs.entrySet()) { + out.println(entry.getKey() + "=" + entry.getValue()); + } + } + } + + @AfterClass + public static void classTearDown() { + REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); + } + + private Vertx vertx; + + @Before + public void setUp() { + vertx = Vertx.vertx(); + } + + @After + public void tearDown() { + if (vertx != null) { + vertx.close(); + } + REST_APP.getServiceContext().close(); + } + + @Test(timeout = 120000L) + public void shouldStreamMultiplePushQueriesRest() { + final List>> responses = new ArrayList<>(NUM_CONCURRENT_REQUESTS_TO_TEST); + + // We should be able to serve multiple pull queries at once, even + // though we only have one event-loop thread, because we have enough + // workers in the worker pool. + for(long i = 0; i < NUM_CONCURRENT_REQUESTS_TO_TEST; i++) { + responses.add(REST_APP.buildKsqlClient().makeQueryRequestStreamed(PUSH_QUERY,i)); + } + + assertThat(responses, everyItem(hasProperty("successful", is(true)))); + + for (final RestResponse> response : responses) { + response.getResponse().close(); + } + } + + private static List makeKsqlRequest(final String sql) { + return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 9952ef3c8764..85931ac5eaad 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -132,30 +132,34 @@ private static void streamEndpointResponse(final Server server, final long startTimeNanos) { final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); - workerExecutor.executeBlocking(promise -> { - final OutputStream ros = new ResponseOutputStream(routingContext.response(), - streamingOutput.getWriteTimeoutMs()); - routingContext.request().connection().closeHandler(v -> { - // Close the OutputStream on close of the HTTP connection + workerExecutor.executeBlocking( + promise -> { + final OutputStream ros = new ResponseOutputStream(routingContext.response(), + streamingOutput.getWriteTimeoutMs()); + routingContext.request().connection().closeHandler(v -> { + // Close the OutputStream on close of the HTTP connection + try { + ros.close(); + } catch (IOException e) { + promise.fail(e); + } + }); try { - ros.close(); - } catch (IOException e) { + streamingOutput.write(new BufferedOutputStream(ros)); + promise.complete(); + } catch (Exception e) { promise.fail(e); + } finally { + try { + ros.close(); + } catch (IOException ignore) { + // Ignore - it might already be closed + } } - }); - try { - streamingOutput.write(new BufferedOutputStream(ros)); - promise.complete(); - } catch (Exception e) { - promise.fail(e); - } finally { - try { - ros.close(); - } catch (IOException ignore) { - // Ignore - it might already be closed - } - } - }, vcf); + }, + false /*if this is true, worker execution blocks the main event loop*/, + vcf + ); vcf.handle((v, throwable) -> { reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); return null; From 3ca0294af84083c709328df1496fa258c93a0429 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 22 Apr 2021 15:42:21 -0500 Subject: [PATCH 2/4] trying workaround for owasp problem --- pom.xml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pom.xml b/pom.xml index 19333dbf5fcc..a08aa0ff6bf5 100644 --- a/pom.xml +++ b/pom.xml @@ -666,6 +666,22 @@ + + org.owasp + dependency-check-maven + 5.2.4 + + + + + false + + + check + + + + From 5cec9a5ee30e9530cce0a933a3673951a08399a4 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 22 Apr 2021 21:40:32 -0500 Subject: [PATCH 3/4] Revert "trying workaround for owasp problem" This reverts commit 3ca0294af84083c709328df1496fa258c93a0429. --- pom.xml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/pom.xml b/pom.xml index a08aa0ff6bf5..19333dbf5fcc 100644 --- a/pom.xml +++ b/pom.xml @@ -666,22 +666,6 @@ - - org.owasp - dependency-check-maven - 5.2.4 - - - - - false - - - check - - - - From 47e55fa9c0f17b2df9e2a8ef73441be4fd5dfedb Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 23 Apr 2021 00:07:14 -0500 Subject: [PATCH 4/4] fix indentation --- .../ksql/api/server/OldApiUtils.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 85931ac5eaad..6929145f9f10 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -133,32 +133,32 @@ private static void streamEndpointResponse(final Server server, final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); workerExecutor.executeBlocking( - promise -> { - final OutputStream ros = new ResponseOutputStream(routingContext.response(), - streamingOutput.getWriteTimeoutMs()); - routingContext.request().connection().closeHandler(v -> { - // Close the OutputStream on close of the HTTP connection + promise -> { + final OutputStream ros = new ResponseOutputStream(routingContext.response(), + streamingOutput.getWriteTimeoutMs()); + routingContext.request().connection().closeHandler(v -> { + // Close the OutputStream on close of the HTTP connection + try { + ros.close(); + } catch (IOException e) { + promise.fail(e); + } + }); try { - ros.close(); - } catch (IOException e) { + streamingOutput.write(new BufferedOutputStream(ros)); + promise.complete(); + } catch (Exception e) { promise.fail(e); + } finally { + try { + ros.close(); + } catch (IOException ignore) { + // Ignore - it might already be closed + } } - }); - try { - streamingOutput.write(new BufferedOutputStream(ros)); - promise.complete(); - } catch (Exception e) { - promise.fail(e); - } finally { - try { - ros.close(); - } catch (IOException ignore) { - // Ignore - it might already be closed - } - } - }, - false /*if this is true, worker execution blocks the main event loop*/, - vcf + }, + false /*if this is true, worker execution blocks the main event loop*/, + vcf ); vcf.handle((v, throwable) -> { reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);