From dbf3e0314e99bb2837c04d5cc20eb0c7ed9f905a Mon Sep 17 00:00:00 2001
From: Almog Gavra
Date: Thu, 1 Aug 2019 16:10:57 -0700
Subject: [PATCH 1/7] feat(ksql-connect): poll connect-configs and auto
register sources
---
.../ksql/properties/with/CreateConfigs.java | 9 +
.../io/confluent/ksql/util/KsqlConfig.java | 20 ++
.../ksql/connect/ConnectConfigService.java | 185 ++++++++++++++++
.../ksql/connect/ConnectPollingService.java | 158 ++++++++++++++
.../io/confluent/ksql/connect/Connector.java | 97 +++++++++
.../io/confluent/ksql/connect/Connectors.java | 89 ++++++++
.../confluent/ksql/connect/KsqlConnect.java | 76 +++++++
.../ksql/inference/TopicSchemaSupplier.java | 8 +
.../connect/ConnectConfigServiceTest.java | 128 +++++++++++
.../connect/ConnectPollingServiceTest.java | 199 ++++++++++++++++++
.../confluent/ksql/connect/ConnectorTest.java | 41 ++++
.../ksql/connect/ConnectorsTest.java | 125 +++++++++++
.../ksql/connect/KsqlConnectTest.java | 73 +++++++
.../ksql/rest/server/KsqlRestApplication.java | 28 ++-
.../rest/server/KsqlRestApplicationTest.java | 16 +-
15 files changed, 1248 insertions(+), 4 deletions(-)
create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java
create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/connect/Connector.java
create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/connect/Connectors.java
create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/connect/KsqlConnect.java
create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorTest.java
create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorsTest.java
create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/connect/KsqlConnectTest.java
diff --git a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java
index dd68786fa7af..53f8b575feb9 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java
@@ -31,6 +31,7 @@ public final class CreateConfigs {
public static final String WINDOW_TYPE_PROPERTY = "WINDOW_TYPE";
public static final String WINDOW_SIZE_PROPERTY = "WINDOW_SIZE";
public static final String AVRO_SCHEMA_ID = "AVRO_SCHEMA_ID";
+ public static final String SOURCE_CONNECTOR = "SOURCE_CONNECTOR";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(
@@ -64,6 +65,14 @@ public final class CreateConfigs {
null,
Importance.LOW,
"Undocumented feature"
+ ).define(
+ SOURCE_CONNECTOR,
+ Type.STRING,
+ null,
+ Importance.LOW,
+ "Indicates that this source was created by a connector with the given name. This "
+ + "is useful for understanding which sources map to which connectors and will "
+ + "be automatically populated for connectors."
);
static {
diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
index 299646930304..e69a73ec48b7 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
@@ -65,6 +65,10 @@ public class KsqlConfig extends AbstractConfig {
public static final String CONNECT_URL_PROPERTY = "ksql.connect.registry.url";
+ public static final String CONNECT_POLLING_DISABLE_PROPERTY = "ksql.connect.polling.disable";
+
+ public static final String CONNECT_CONFIGS_TOPIC_PROPERTY = "ksql.connect.configs.topic";
+
public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";
public static final String KSQL_EXT_DIR = "ksql.extension.dir";
@@ -158,6 +162,7 @@ public class KsqlConfig extends AbstractConfig {
public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
public static final String DEFAULT_CONNECT_URL = "http://localhost:8083";
+ public static final String DEFAULT_CONNECT_CONFIGS_TOPIC = "connect-configs";
public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";
@@ -390,6 +395,7 @@ private static ConfigDef configDef(final ConfigGeneration generation) {
return generation == ConfigGeneration.CURRENT ? CURRENT_DEF : LEGACY_DEF;
}
+ // CHECKSTYLE_RULES.OFF: MethodLength
private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
final ConfigDef configDef = new ConfigDef()
.define(
@@ -439,6 +445,19 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
DEFAULT_CONNECT_URL,
Importance.MEDIUM,
"The URL for the connect deployment, defaults to http://localhost:8083"
+ ).define(
+ CONNECT_POLLING_DISABLE_PROPERTY,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ "A value of true for this configuration will disable automatically importing sources "
+ + "from connectors into KSQL."
+ ).define(
+ CONNECT_CONFIGS_TOPIC_PROPERTY ,
+ ConfigDef.Type.STRING,
+ DEFAULT_CONNECT_CONFIGS_TOPIC,
+ Importance.LOW,
+ "The name for the connect configuration topic, defaults to 'connect-configs'"
).define(
KSQL_ENABLE_UDFS,
ConfigDef.Type.BOOLEAN,
@@ -539,6 +558,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
}
return configDef;
}
+ // CHECKSTYLE_RULES.ON: MethodLength
private static final class ConfigValue {
final ConfigItem configItem;
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
new file mode 100644
index 000000000000..f2c8786f82d3
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"; you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.confluent.ksql.util.KsqlConfig;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@code ConnectConfigService} listens to the connect configuration topic,
+ * which outputs messages whenever a new connector (or connector task) is started
+ * in Connect. These messages contain information that is then passed to a
+ * {@link ConnectPollingService} to digest and register with KSQL.
+ *
+ * On startup, this service reads the connect configuration topic from the
+ * beginning to make sure that it reconstructs the necessary state.
+ */
+class ConnectConfigService extends AbstractExecutionThreadService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectConfigService.class);
+ private static final long POLL_TIMEOUT_S = 60;
+
+ private final KsqlConfig ksqlConfig;
+ private final String configsTopic;
+ private final ConnectPollingService pollingService;
+ private final JsonConverter converter;
+ private final Function
*/
@ThreadSafe
-class ConnectPollingService extends AbstractScheduledService {
+final class ConnectPollingService extends AbstractScheduledService {
private static final Logger LOG = LoggerFactory.getLogger(ConnectPollingService.class);
private static final int INTERVAL_S = 30;
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
index 8bb641672a22..cd6337d8fcd4 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
@@ -16,6 +16,7 @@
package io.confluent.ksql.connect;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
@@ -26,10 +27,11 @@
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.KsqlConfig;
-import java.util.Map;
+import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -37,15 +39,24 @@
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
+import org.mockito.internal.verification.NoMoreInteractions;
+import org.mockito.internal.verification.Times;
import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ConnectConfigServiceTest {
+ @Rule
+ public final Timeout timeout = Timeout.seconds(30);
+
@Mock
private KafkaConsumer consumer;
@Mock
@@ -54,14 +65,24 @@ public class ConnectConfigServiceTest {
private ConnectClient connectClient;
@Mock
private Connector connector;
+ @Mock
+ private Function> connectorFactory;
private ConnectConfigService configService;
- @Test(timeout = 30_000L)
+ @Before
+ public void setUp() {
+ when(connectorFactory.apply(any())).thenReturn(Optional.of(connector));
+ }
+
+ @Test
public void shouldCreateConnectorFromConfig() throws InterruptedException {
// Given:
- final Map config = ImmutableMap.of();
- givenConnector("connector", config);
+ givenConnectors("connector");
+ givenNoMoreRecords(
+ givenConnectorRecord(
+ when(consumer.poll(any())))
+ );
final CountDownLatch awaitIteration = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
@@ -79,11 +100,11 @@ public void shouldCreateConnectorFromConfig() throws InterruptedException {
configService.stopAsync().awaitTerminated();
}
- @Test(timeout = 30_000L)
+ @Test
public void shouldWakeupConsumerBeforeShuttingDown() {
// Given:
setupConfigService();
- givenConnector("ignored", ImmutableMap.of());
+ givenNoMoreRecords(when(consumer.poll(any())));
configService.startAsync().awaitRunning();
// When:
@@ -97,14 +118,141 @@ public void shouldWakeupConsumerBeforeShuttingDown() {
inOrder.verifyNoMoreInteractions();
}
- private void givenConnector(final String name, final Map properties) {
- final CountDownLatch awaitWakeup = new CountDownLatch(1);
- when(consumer.poll(any()))
- .thenReturn(new ConsumerRecords<>(
+ @Test
+ public void shouldNotDescribeSameConnectorTwice() throws InterruptedException {
+ // Given:
+ givenConnectors("connector");
+
+ final CountDownLatch noMoreLatch = new CountDownLatch(1);
+ givenNoMoreRecords(
+ givenConnectorRecord(
+ givenConnectorRecord(
+ when(consumer.poll(any())))
+ ),
+ noMoreLatch
+ );
+ setupConfigService();
+
+ // When:
+ configService.startAsync().awaitRunning();
+ noMoreLatch.await();
+
+ // Then:
+ final InOrder inOrder = inOrder(consumer, connectClient);
+ inOrder.verify(consumer).poll(any());
+ inOrder.verify(connectClient).connectors();
+ inOrder.verify(connectClient).describe("connector");
+ inOrder.verify(consumer).poll(any());
+ inOrder.verify(connectClient).connectors();
+ inOrder.verify(consumer).poll(any());
+ // note no more calls to describe
+ inOrder.verifyNoMoreInteractions();
+
+ configService.stopAsync().awaitTerminated();
+ }
+
+ @Test
+ public void shouldIgnoreUnsupportedConnectors() throws InterruptedException {
+ // Given:
+ final CountDownLatch noMoreLatch = new CountDownLatch(1);
+ givenNoMoreRecords(
+ givenConnectorRecord(
+ when(consumer.poll(any()))),
+ noMoreLatch
+ );
+ givenConnectors("foo");
+ when(connectorFactory.apply(any())).thenReturn(Optional.empty());
+ setupConfigService();
+
+ // When:
+ configService.startAsync().awaitRunning();
+ noMoreLatch.await();
+
+ // Then:
+ verify(pollingService, new NoMoreInteractions()).addConnector(any());
+ configService.stopAsync().awaitTerminated();
+ }
+
+ @Test
+ public void shouldIgnoreConnectClientFailureToList() throws InterruptedException {
+ // Given:
+ final CountDownLatch noMoreLatch = new CountDownLatch(1);
+ givenNoMoreRecords(
+ givenConnectorRecord(
+ when(consumer.poll(any()))),
+ noMoreLatch
+ );
+ when(connectClient.connectors()).thenThrow(new KsqlServerException("fail!"));
+ setupConfigService();
+
+ // When:
+ configService.startAsync().awaitRunning();
+ noMoreLatch.await();
+
+ // Then:
+ verify(connectClient, new Times(1)).connectors();
+ verify(connectClient, new NoMoreInteractions()).describe(any());
+ // poll again even though error was thrown
+ verify(consumer, new Times(2)).poll(any());
+ configService.stopAsync().awaitTerminated();
+ }
+
+ @Test
+ public void shouldIgnoreConnectClientFailureToDescribe() throws InterruptedException {
+ // Given:
+ givenConnectors("connector", "connector2");
+ when(connectClient.describe("connector")).thenThrow(new KsqlServerException("fail!"));
+
+ givenNoMoreRecords(
+ givenConnectorRecord(
+ when(consumer.poll(any())))
+ );
+
+ final CountDownLatch awaitIteration = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ awaitIteration.countDown();
+ return null;
+ }).when(pollingService).addConnector(connector);
+ setupConfigService();
+
+ // When:
+ configService.startAsync().awaitRunning();
+
+ // Then:
+ awaitIteration.await();
+ verify(connectorFactory, new Times(1)).apply(any());
+ verify(pollingService).addConnector(connector);
+ configService.stopAsync().awaitTerminated();
+ }
+
+ private void givenConnectors(final String... names){
+ for (final String name : names) {
+ when(connectClient.describe(name)).thenReturn(ConnectResponse.of(new ConnectorInfo(
+ name,
+ ImmutableMap.of(),
+ ImmutableList.of(),
+ ConnectorType.SOURCE
+ )));
+ }
+ when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names)));
+ }
+
+ private OngoingStubbing> givenConnectorRecord(OngoingStubbing> stubbing) {
+ return stubbing
+ .thenAnswer(inv -> new ConsumerRecords<>(
ImmutableMap.of(
new TopicPartition("topic", 0),
- ImmutableList.of(new ConsumerRecord<>("topic", 0, 0L, "connector", new byte[]{})))))
- .thenAnswer(invocationOnMock -> {
+ ImmutableList.of(new ConsumerRecord<>("topic", 0, 0L, "connector", new byte[]{})))));
+ }
+
+ private void givenNoMoreRecords(final OngoingStubbing> stubbing) {
+ givenNoMoreRecords(stubbing, new CountDownLatch(1));
+ }
+
+ private void givenNoMoreRecords(final OngoingStubbing> stubbing, final CountDownLatch noMoreLatch) {
+ final CountDownLatch awaitWakeup = new CountDownLatch(1);
+ stubbing.thenAnswer(invocationOnMock -> {
+ noMoreLatch.countDown();
awaitWakeup.await(30, TimeUnit.SECONDS);
throw new WakeupException();
});
@@ -113,14 +261,6 @@ private void givenConnector(final String name, final Map propert
awaitWakeup.countDown();
return null;
}).when(consumer).wakeup();
-
- when(connectClient.describe(name)).thenReturn(ConnectResponse.of(new ConnectorInfo(
- name,
- properties,
- ImmutableList.of(),
- ConnectorType.SOURCE
- )));
- when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.of(name)));
}
private void setupConfigService() {
@@ -128,7 +268,7 @@ private void setupConfigService() {
new KsqlConfig(ImmutableMap.of()),
connectClient,
pollingService,
- info -> Optional.of(connector),
+ connectorFactory,
props -> consumer);
}
From 421911c1f90972f05f8a68f96d139bd53132da89 Mon Sep 17 00:00:00 2001
From: Almog Gavra
Date: Mon, 12 Aug 2019 11:56:11 -0700
Subject: [PATCH 7/7] fix: fix findbug issue with await()
---
.../confluent/ksql/connect/ConnectConfigServiceTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
index cd6337d8fcd4..ec6b32e716c6 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
@@ -252,10 +252,10 @@ private void givenNoMoreRecords(final OngoingStubbing> stubbing) {
private void givenNoMoreRecords(final OngoingStubbing> stubbing, final CountDownLatch noMoreLatch) {
final CountDownLatch awaitWakeup = new CountDownLatch(1);
stubbing.thenAnswer(invocationOnMock -> {
- noMoreLatch.countDown();
- awaitWakeup.await(30, TimeUnit.SECONDS);
- throw new WakeupException();
- });
+ noMoreLatch.countDown();
+ awaitWakeup.await();
+ throw new WakeupException();
+ });
doAnswer(invocation -> {
awaitWakeup.countDown();