diff --git a/config/ksql-server.properties b/config/ksql-server.properties index f03ea402223b..1437b05945de 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -47,6 +47,7 @@ ksql.logging.processing.stream.auto.create=true # The set of Kafka brokers to bootstrap Kafka cluster information from: bootstrap.servers=localhost:9092 +ksql.connect.polling.enable=true # Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: #ksql.schema.registry.url=? diff --git a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java index dd68786fa7af..53f8b575feb9 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java +++ b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CreateConfigs.java @@ -31,6 +31,7 @@ public final class CreateConfigs { public static final String WINDOW_TYPE_PROPERTY = "WINDOW_TYPE"; public static final String WINDOW_SIZE_PROPERTY = "WINDOW_SIZE"; public static final String AVRO_SCHEMA_ID = "AVRO_SCHEMA_ID"; + public static final String SOURCE_CONNECTOR = "SOURCE_CONNECTOR"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define( @@ -64,6 +65,14 @@ public final class CreateConfigs { null, Importance.LOW, "Undocumented feature" + ).define( + SOURCE_CONNECTOR, + Type.STRING, + null, + Importance.LOW, + "Indicates that this source was created by a connector with the given name. This " + + "is useful for understanding which sources map to which connectors and will " + + "be automatically populated for connectors." ); static { diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 299646930304..7f463e70a673 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -65,6 +65,10 @@ public class KsqlConfig extends AbstractConfig { public static final String CONNECT_URL_PROPERTY = "ksql.connect.registry.url"; + public static final String CONNECT_POLLING_ENABLE_PROPERTY = "ksql.connect.polling.enable"; + + public static final String CONNECT_CONFIGS_TOPIC_PROPERTY = "ksql.connect.configs.topic"; + public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled"; public static final String KSQL_EXT_DIR = "ksql.extension.dir"; @@ -158,6 +162,7 @@ public class KsqlConfig extends AbstractConfig { public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081"; public static final String DEFAULT_CONNECT_URL = "http://localhost:8083"; + public static final String DEFAULT_CONNECT_CONFIGS_TOPIC = "connect-configs"; public static final String KSQL_STREAMS_PREFIX = "ksql.streams."; @@ -390,6 +395,7 @@ private static ConfigDef configDef(final ConfigGeneration generation) { return generation == ConfigGeneration.CURRENT ? CURRENT_DEF : LEGACY_DEF; } + // CHECKSTYLE_RULES.OFF: MethodLength private static ConfigDef buildConfigDef(final ConfigGeneration generation) { final ConfigDef configDef = new ConfigDef() .define( @@ -439,6 +445,19 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { DEFAULT_CONNECT_URL, Importance.MEDIUM, "The URL for the connect deployment, defaults to http://localhost:8083" + ).define( + CONNECT_POLLING_ENABLE_PROPERTY, + Type.BOOLEAN, + false, + Importance.LOW, + "A value of false for this configuration will disable automatically importing sources " + + "from connectors into KSQL." + ).define( + CONNECT_CONFIGS_TOPIC_PROPERTY , + ConfigDef.Type.STRING, + DEFAULT_CONNECT_CONFIGS_TOPIC, + Importance.LOW, + "The name for the connect configuration topic, defaults to 'connect-configs'" ).define( KSQL_ENABLE_UDFS, ConfigDef.Type.BOOLEAN, @@ -539,6 +558,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { } return configDef; } + // CHECKSTYLE_RULES.ON: MethodLength private static final class ConfigValue { final ConfigItem configItem; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java new file mode 100644 index 000000000000..65539f1fa326 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java @@ -0,0 +1,218 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.common.util.concurrent.MoreExecutors; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlServerException; +import java.time.Duration; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code ConnectConfigService} listens to the connect configuration topic, + * which outputs messages whenever a new connector (or connector task) is started + * in Connect. These messages contain information that is then passed to a + * {@link ConnectPollingService} to digest and register with KSQL. + * + *

On startup, this service reads the connect configuration topic from the + * beginning to make sure that it reconstructs the necessary state.

+ */ +final class ConnectConfigService extends AbstractExecutionThreadService { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectConfigService.class); + private static final long POLL_TIMEOUT_S = 60; + + private final KsqlConfig ksqlConfig; + private final String configsTopic; + private final ConnectClient connectClient; + private final ConnectPollingService pollingService; + private final Function, KafkaConsumer> consumerFactory; + private final Function> connectorFactory; + + private Set handledConnectors = new HashSet<>(); + + // not final because constructing a consumer is expensive and should be + // done in startUp() + private KafkaConsumer consumer; + + ConnectConfigService( + final KsqlConfig ksqlConfig, + final ConnectClient connectClient, + final ConnectPollingService pollingService + ) { + this( + ksqlConfig, + connectClient, + pollingService, + Connectors::fromConnectInfo, + KafkaConsumer::new + ); + } + + @VisibleForTesting + ConnectConfigService( + final KsqlConfig ksqlConfig, + final ConnectClient connectClient, + final ConnectPollingService pollingService, + final Function> connectorFactory, + final Function, KafkaConsumer> consumerFactory + ) { + this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); + this.pollingService = Objects.requireNonNull(pollingService, "pollingService"); + this.connectorFactory = Objects.requireNonNull(connectorFactory, "connectorFactory"); + this.consumerFactory = Objects.requireNonNull(consumerFactory, "consumerFactory"); + this.connectClient = Objects.requireNonNull(connectClient, "connectClient"); + this.configsTopic = ksqlConfig.getString(KsqlConfig.CONNECT_CONFIGS_TOPIC_PROPERTY); + + addListener(new Listener() { + @Override + public void failed(final State from, final Throwable failure) { + LOG.error("ConnectConfigService failed due to: ", failure); + } + }, MoreExecutors.directExecutor()); + } + + @Override + protected void startUp() { + final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); + final Map consumerConfigs = ImmutableMap.builder() + .putAll(ksqlConfig.getProducerClientConfigProps()) + // don't create the config topic if it doesn't exist - this is also necessary + // for some integration tests to pass + .put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false) + // set the group id to be the same as the service id to make sure that only one + // KSQL server will subscribe to the topic and handle connectors (use this as + // a form of poor man's leader election) + .put(ConsumerConfig.GROUP_ID_CONFIG, serviceId) + .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .build(); + + consumer = consumerFactory.apply(consumerConfigs); + consumer.subscribe(ImmutableList.of(configsTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(final Collection partitions) { } + + @Override + public void onPartitionsAssigned(final Collection partitions) { + // if we were assigned the (only) connect-configs partition, we should rebuild + // our entire state - this ensures that only a single server in the KSQL cluster + // handles new connectors + checkConnectors(); + } + }); + } + + @Override + protected void run() { + while (isRunning()) { + try { + final ConsumerRecords records; + records = consumer.poll(Duration.ofSeconds(POLL_TIMEOUT_S)); + LOG.debug("Polled {} records from connect config topic", records.count()); + + if (!records.isEmpty()) { + checkConnectors(); + } + } catch (final WakeupException e) { + if (isRunning()) { + throw e; + } + } + } + } + + private void checkConnectors() { + try { + // something changed in the connect configuration topic - poll connect + // and see if we need to update our connectors + final ConnectResponse> allConnectors = connectClient.connectors(); + if (allConnectors.datum().isPresent()) { + final List toProcess = allConnectors.datum() + .get() + .stream() + .filter(connectorName -> !handledConnectors.contains(connectorName)) + .collect(Collectors.toList()); + LOG.info("Was made aware of the following connectors: {}", toProcess); + + toProcess.forEach(this::handleConnector); + } + } catch (final KsqlServerException e) { + LOG.warn("Failed to check the connectors due to some server error.", e); + } + } + + private void handleConnector(final String name) { + try { + final ConnectResponse describe = connectClient.describe(name); + if (!describe.datum().isPresent()) { + return; + } + + handledConnectors.add(name); + final Optional connector = connectorFactory.apply(describe.datum().get()); + if (connector.isPresent()) { + LOG.info("Registering connector {}", connector); + pollingService.addConnector(connector.get()); + } else { + LOG.warn("Ignoring unsupported connector {} ({})", + name, describe.datum().get().config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + } + } catch (final KsqlServerException e) { + LOG.warn("Failed to describe connector {} due to some server error.", name, e); + } + } + + @Override + protected void shutDown() { + // this is called in the same thread as run() as it is not thread-safe + consumer.close(); + LOG.info("ConnectConfigService is down."); + } + + @Override + protected void triggerShutdown() { + // this is called on a different thread from run() since it is thread-safe + LOG.info("Shutting down ConnectConfigService."); + consumer.wakeup(); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java new file mode 100644 index 000000000000..099d2fb2dced --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java @@ -0,0 +1,158 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractScheduledService; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import io.confluent.ksql.parser.properties.with.CreateSourceProperties; +import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.CreateStream; +import io.confluent.ksql.parser.tree.CreateTable; +import io.confluent.ksql.parser.tree.Literal; +import io.confluent.ksql.parser.tree.QualifiedName; +import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.parser.tree.TableElements; +import io.confluent.ksql.properties.with.CommonCreateConfigs; +import io.confluent.ksql.properties.with.CreateConfigs; +import io.confluent.ksql.util.KsqlConstants; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import javax.annotation.concurrent.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code ConnectPollingService} will maintain a list of {@link Connector}s + * and for each one it will poll kafka and schema registry to see if any topics + * created by that connector are eligible for automatic registry with KSQL on + * a regular basis. + * + *

Ideally, Connect would implement a metadata topic where it publishes all + * this information on a push basis, so we don't need to poll kafka or schema + * registry.

+ */ +@ThreadSafe +final class ConnectPollingService extends AbstractScheduledService { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectPollingService.class); + private static final int INTERVAL_S = 30; + + private final KsqlExecutionContext executionContext; + private final Consumer sourceCallback; + + private Set connectors; + + ConnectPollingService( + final KsqlExecutionContext executionContext, + final Consumer sourceCallback + ) { + this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); + this.sourceCallback = Objects.requireNonNull(sourceCallback, "sourceCallback"); + this.connectors = ConcurrentHashMap.newKeySet(); + } + + /** + * Add this connector to the set of connectors that are polled by this + * {@code ConnectPollingService}. Next time an iteration is scheduled in + * {@value #INTERVAL_S} seconds, the connector will be included in the topic + * scan. + * + * @param connector a connector to register + */ + void addConnector(final Connector connector) { + connectors.add(connector); + } + + @Override + protected void runOneIteration() { + if (connectors.isEmpty()) { + // avoid making external calls if unnecessary + return; + } + + try { + final Set topics = executionContext.getServiceContext() + .getAdminClient() + .listTopics() + .names() + .get(10, TimeUnit.SECONDS); + + final Set subjects = ImmutableSet.copyOf( + executionContext.getServiceContext() + .getSchemaRegistryClient() + .getAllSubjects() + ); + + for (final String topic : topics) { + final Optional maybeConnector = connectors + .stream() + .filter(candidate -> candidate.matches(topic)) + .findFirst(); + maybeConnector.ifPresent(connector -> handleTopic(topic, subjects, connector)); + } + } catch (final Exception e) { + LOG.error("Could not resolve connect sources. Trying again in {} seconds.", INTERVAL_S, e); + } + } + + private void handleTopic( + final String topic, + final Set subjects, + final Connector connector + ) { + final String valueSubject = topic + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; + if (subjects.contains(valueSubject)) { + final String name = connector.getName(); + final String source = connector.mapToSource(topic).toUpperCase(); + + // if the meta store already contains the source, don't send the extra command + // onto the command topic + if (executionContext.getMetaStore().getSource(source) == null) { + LOG.info("Auto-Registering topic {} from connector {} as source {}", + topic, connector.getName(), source); + final Builder builder = ImmutableMap.builder() + .put(CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral(topic)) + .put(CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("AVRO")) + .put(CreateConfigs.SOURCE_CONNECTOR, new StringLiteral(name)); + + connector.getKeyField().ifPresent( + key -> builder.put(CreateConfigs.KEY_NAME_PROPERTY, new StringLiteral(key)) + ); + + final CreateSourceProperties properties = CreateSourceProperties.from(builder.build()); + final CreateSource createSource = + connector.getSourceType() == (DataSourceType.KSTREAM) + ? new CreateStream(QualifiedName.of(source), TableElements.of(), true, properties) + : new CreateTable(QualifiedName.of(source), TableElements.of(), true, properties); + + sourceCallback.accept(createSource); + } + } + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedRateSchedule(0, INTERVAL_S, TimeUnit.SECONDS); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/Connector.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/Connector.java new file mode 100644 index 000000000000..0ae86ace51b1 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/Connector.java @@ -0,0 +1,97 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Predicate; + +/** + * A model for a connector, which contains various information that + * helps map topics to KSQL sources. + */ +@Immutable +class Connector { + + private final String name; + private final Predicate isTopicMatch; + private final Function getSourceName; + private final DataSourceType sourceType; + private final Optional keyField; + + Connector( + final String name, + final Predicate isTopicMatch, + final Function getSourceName, + final DataSourceType sourceType, + final String keyField) { + this.name = Objects.requireNonNull(name, "name"); + this.isTopicMatch = Objects.requireNonNull(isTopicMatch, "isTopicMatch"); + this.getSourceName = Objects.requireNonNull(getSourceName, "getSourceName"); + this.sourceType = Objects.requireNonNull(sourceType, "sourceType"); + this.keyField = Optional.ofNullable(keyField); + } + + String getName() { + return name; + } + + boolean matches(final String topic) { + return isTopicMatch.test(topic); + } + + String mapToSource(final String topic) { + return getSourceName.apply(topic); + } + + DataSourceType getSourceType() { + return sourceType; + } + + public Optional getKeyField() { + return keyField; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Connector that = (Connector) o; + return Objects.equals(name, that.name) + && sourceType == that.sourceType + && Objects.equals(keyField, that.keyField); + } + + @Override + public int hashCode() { + return Objects.hash(name, sourceType, keyField); + } + + @Override + public String toString() { + return "Connector{" + "name='" + name + '\'' + + ", sourceType=" + sourceType + + ", keyField=" + keyField + + '}'; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/Connectors.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/Connectors.java new file mode 100644 index 000000000000..55df866a1ad4 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/Connectors.java @@ -0,0 +1,97 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.common.base.Splitter; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; + +final class Connectors { + + static final String CONNECTOR_CLASS = "connector.class"; + static final String JDBC_SOURCE_CLASS = "io.confluent.connect.jdbc.JdbcSourceConnector"; + + private Connectors() { + } + + static Optional fromConnectInfo(final ConnectorInfo connectorInfo) { + return fromConnectInfo(connectorInfo.config()); + } + + @SuppressWarnings("SwitchStatementWithTooFewBranches") // will soon expand to more + static Optional fromConnectInfo(final Map properties) { + final String clazz = properties.get(CONNECTOR_CLASS); + if (clazz == null) { + return Optional.empty(); + } + + switch (clazz) { + case JDBC_SOURCE_CLASS: + return Optional.of(jdbc(properties)); + default: + return Optional.empty(); + } + } + + private static Connector jdbc(final Map properties) { + final String name = properties.get("name"); + final String prefix = properties.get("topic.prefix"); + + return new Connector( + name, + topic -> topic.startsWith(prefix), + topic -> clean(name + "_" + topic.substring(prefix.length())), + DataSourceType.KTABLE, + extractKeyNameFromSMT(properties).orElse(null) + ); + } + + /** + * JDBC connector does not necessarily have an easy way to define the key field (or the primary + * column in the database). Most configurations of JDBC, therefore, will specify an extract field + * transform to determine the key of the table - the built in one in connect is ExtractField$Key, + * though it is not necessary that the connect is configured with this Single Message Transform. + * If it is not configured such, we will not be able to determine the key and the user will need + * to manually import the connector. + * + *

This is pretty hacky, and we need to figure out a better long-term way to determine the key + * column from a connector.

+ */ + private static Optional extractKeyNameFromSMT(final Map properties) { + final String transformsString = properties.get("transforms"); + if (transformsString == null) { + return Optional.empty(); + } + + final List transforms = Splitter.on(',').splitToList(transformsString); + for (String transform : transforms) { + final String transformType = properties.get("transforms." + transform + ".type"); + if (transformType != null && transformType.contains("ExtractField$Key")) { + return Optional.ofNullable(properties.get("transforms." + transform + ".field")); + } + } + + return Optional.empty(); + } + + private static String clean(final String name) { + return name.replace('-', '_').replace('.', '_'); + } + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/KsqlConnect.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/KsqlConnect.java new file mode 100644 index 000000000000..d03b01514f73 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/KsqlConnect.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.util.KsqlConfig; +import java.io.Closeable; +import java.util.Objects; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple wrapper around {@link ConnectPollingService} and {@link ConnectConfigService} + * to make lifecycle management a little easier. + */ +public class KsqlConnect implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(KsqlConnect.class); + + private final ConnectPollingService connectPollingService; + private final ConnectConfigService configService; + private final boolean enabled; + + public KsqlConnect( + final KsqlExecutionContext executionContext, + final KsqlConfig ksqlConfig, + final Consumer sourceCallback + ) { + connectPollingService = new ConnectPollingService(executionContext, sourceCallback); + configService = new ConnectConfigService( + ksqlConfig, executionContext.getServiceContext().getConnectClient(), connectPollingService); + enabled = ksqlConfig.getBoolean(KsqlConfig.CONNECT_POLLING_ENABLE_PROPERTY); + } + + @VisibleForTesting + KsqlConnect( + final ConnectPollingService connectPollingService, + final ConnectConfigService connectConfigService + ) { + this.connectPollingService = Objects + .requireNonNull(connectPollingService, "connectPollingService"); + this.configService = Objects + .requireNonNull(connectConfigService, "connectConfigService"); + enabled = true; + } + + /** + * Asynchronously starts the KSQL-Connect integration components - does not + * wait for them to startup before returning. + */ + public void startAsync() { + if (enabled) { + connectPollingService.startAsync(); + configService.startAsync(); + } else { + LOG.info("Connect integration is disabled, turn on by setting " + + KsqlConfig.CONNECT_POLLING_ENABLE_PROPERTY); + } + } + + @Override + public void close() { + if (enabled) { + configService.stopAsync().awaitTerminated(); + connectPollingService.stopAsync().awaitTerminated(); + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/TopicSchemaSupplier.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/TopicSchemaSupplier.java index e7d509c83ba4..84551a6276b0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/TopicSchemaSupplier.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/TopicSchemaSupplier.java @@ -69,5 +69,13 @@ static SchemaResult success(final SchemaAndId schemaAndId) { static SchemaResult failure(final Exception cause) { return new SchemaResult(Optional.empty(), Optional.of(cause)); } + + public Optional getSchemaAndId() { + return schemaAndId; + } + + public Optional getFailureReason() { + return failureReason; + } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java index 5e99970b5f55..5eaddbe77a6c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java @@ -16,6 +16,7 @@ package io.confluent.ksql.services; import io.confluent.ksql.util.KsqlPreconditions; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -26,6 +27,20 @@ */ public interface ConnectClient { + /** + * List all of the connectors available in this connect cluster. + * + * @return a list of connector names + */ + ConnectResponse> connectors(); + + /** + * Gets the configuration for a specified connector. + * + * @param connector the name of the connector + */ + ConnectResponse describe(String connector); + /** * Creates a connector with {@code connector} as the name under the * specified configuration. diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java index 6375da05f52b..62cac95b186d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java @@ -22,8 +22,10 @@ import io.confluent.ksql.util.KsqlServerException; import java.net.URI; import java.net.URISyntaxException; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import org.apache.http.HttpStatus; import org.apache.http.client.ResponseHandler; import org.apache.http.client.fluent.Request; @@ -82,7 +84,8 @@ public ConnectResponse create( ContentType.APPLICATION_JSON ) .execute() - .handleResponse(createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class)); + .handleResponse( + createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class, Function.identity())); connectResponse.error() .ifPresent(error -> LOG.warn("Did not CREATE connector {}: {}", connector, error)); @@ -93,9 +96,56 @@ public ConnectResponse create( } } - private static ResponseHandler> createHandler( + @SuppressWarnings("unchecked") + @Override + public ConnectResponse> connectors() { + try { + LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", connectUri); + + final ConnectResponse> connectResponse = Request + .Get(connectUri.resolve(CONNECTORS)) + .socketTimeout(DEFAULT_TIMEOUT_MS) + .connectTimeout(DEFAULT_TIMEOUT_MS) + .execute() + .handleResponse( + createHandler(HttpStatus.SC_OK, List.class, foo -> (List) foo)); + + connectResponse.error() + .ifPresent(error -> LOG.warn("Could not list connectors: {}.", error)); + + return connectResponse; + } catch (final Exception e) { + throw new KsqlServerException(e); + } + } + + @Override + public ConnectResponse describe(final String connector) { + try { + LOG.debug("Issuing request to Kafka Connect at URI {} to get config for {}", + connectUri, connector); + + final ConnectResponse connectResponse = Request + .Get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector))) + .socketTimeout(DEFAULT_TIMEOUT_MS) + .connectTimeout(DEFAULT_TIMEOUT_MS) + .execute() + .handleResponse( + createHandler(HttpStatus.SC_OK, ConnectorInfo.class, Function.identity())); + + connectResponse.error() + .ifPresent(error -> LOG.warn("Could not list connectors: {}.", error)); + + return connectResponse; + } catch (final Exception e) { + throw new KsqlServerException(e); + } + } + + private static ResponseHandler> createHandler( final int expectedStatus, - final Class entityClass + final Class entityClass, + final Function cast ) { return httpResponse -> { if (httpResponse.getStatusLine().getStatusCode() != expectedStatus) { @@ -103,9 +153,9 @@ private static ResponseHandler> createHandler( return ConnectResponse.of(entity); } - final T info = MAPPER.readValue( + final T info = cast.apply(MAPPER.readValue( httpResponse.getEntity().getContent(), - entityClass); + entityClass)); return ConnectResponse.of(info); }; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java index 4e6d99763640..9fb9c92c6082 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java @@ -17,6 +17,7 @@ import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; +import com.google.common.collect.ImmutableList; import io.confluent.ksql.services.ConnectClient.ConnectResponse; import io.confluent.ksql.util.LimitedProxyBuilder; import java.util.Map; @@ -32,6 +33,8 @@ private SandboxConnectClient() { } public static ConnectClient createProxy() { return LimitedProxyBuilder.forClass(ConnectClient.class) .swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox")) + .swallow("describe", methodParams(String.class), ConnectResponse.of(ImmutableList.of())) + .swallow("connectors", methodParams(), ConnectResponse.of(ImmutableList.of())) .build(); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java new file mode 100644 index 000000000000..ec6b32e716c6 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java @@ -0,0 +1,275 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyFloat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlServerException; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.internal.verification.NoMoreInteractions; +import org.mockito.internal.verification.Times; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.OngoingStubbing; + +@RunWith(MockitoJUnitRunner.class) +public class ConnectConfigServiceTest { + + @Rule + public final Timeout timeout = Timeout.seconds(30); + + @Mock + private KafkaConsumer consumer; + @Mock + private ConnectPollingService pollingService; + @Mock + private ConnectClient connectClient; + @Mock + private Connector connector; + @Mock + private Function> connectorFactory; + + private ConnectConfigService configService; + + @Before + public void setUp() { + when(connectorFactory.apply(any())).thenReturn(Optional.of(connector)); + } + + @Test + public void shouldCreateConnectorFromConfig() throws InterruptedException { + // Given: + givenConnectors("connector"); + givenNoMoreRecords( + givenConnectorRecord( + when(consumer.poll(any()))) + ); + + final CountDownLatch awaitIteration = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + awaitIteration.countDown(); + return null; + }).when(pollingService).addConnector(connector); + setupConfigService(); + + // When: + configService.startAsync().awaitRunning(); + + // Then: + awaitIteration.await(); + verify(pollingService).addConnector(connector); + configService.stopAsync().awaitTerminated(); + } + + @Test + public void shouldWakeupConsumerBeforeShuttingDown() { + // Given: + setupConfigService(); + givenNoMoreRecords(when(consumer.poll(any()))); + configService.startAsync().awaitRunning(); + + // When: + configService.stopAsync().awaitTerminated(); + + // Then: + final InOrder inOrder = inOrder(consumer); + inOrder.verify(consumer).poll(any()); + inOrder.verify(consumer).wakeup(); + inOrder.verify(consumer).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldNotDescribeSameConnectorTwice() throws InterruptedException { + // Given: + givenConnectors("connector"); + + final CountDownLatch noMoreLatch = new CountDownLatch(1); + givenNoMoreRecords( + givenConnectorRecord( + givenConnectorRecord( + when(consumer.poll(any()))) + ), + noMoreLatch + ); + setupConfigService(); + + // When: + configService.startAsync().awaitRunning(); + noMoreLatch.await(); + + // Then: + final InOrder inOrder = inOrder(consumer, connectClient); + inOrder.verify(consumer).poll(any()); + inOrder.verify(connectClient).connectors(); + inOrder.verify(connectClient).describe("connector"); + inOrder.verify(consumer).poll(any()); + inOrder.verify(connectClient).connectors(); + inOrder.verify(consumer).poll(any()); + // note no more calls to describe + inOrder.verifyNoMoreInteractions(); + + configService.stopAsync().awaitTerminated(); + } + + @Test + public void shouldIgnoreUnsupportedConnectors() throws InterruptedException { + // Given: + final CountDownLatch noMoreLatch = new CountDownLatch(1); + givenNoMoreRecords( + givenConnectorRecord( + when(consumer.poll(any()))), + noMoreLatch + ); + givenConnectors("foo"); + when(connectorFactory.apply(any())).thenReturn(Optional.empty()); + setupConfigService(); + + // When: + configService.startAsync().awaitRunning(); + noMoreLatch.await(); + + // Then: + verify(pollingService, new NoMoreInteractions()).addConnector(any()); + configService.stopAsync().awaitTerminated(); + } + + @Test + public void shouldIgnoreConnectClientFailureToList() throws InterruptedException { + // Given: + final CountDownLatch noMoreLatch = new CountDownLatch(1); + givenNoMoreRecords( + givenConnectorRecord( + when(consumer.poll(any()))), + noMoreLatch + ); + when(connectClient.connectors()).thenThrow(new KsqlServerException("fail!")); + setupConfigService(); + + // When: + configService.startAsync().awaitRunning(); + noMoreLatch.await(); + + // Then: + verify(connectClient, new Times(1)).connectors(); + verify(connectClient, new NoMoreInteractions()).describe(any()); + // poll again even though error was thrown + verify(consumer, new Times(2)).poll(any()); + configService.stopAsync().awaitTerminated(); + } + + @Test + public void shouldIgnoreConnectClientFailureToDescribe() throws InterruptedException { + // Given: + givenConnectors("connector", "connector2"); + when(connectClient.describe("connector")).thenThrow(new KsqlServerException("fail!")); + + givenNoMoreRecords( + givenConnectorRecord( + when(consumer.poll(any()))) + ); + + final CountDownLatch awaitIteration = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + awaitIteration.countDown(); + return null; + }).when(pollingService).addConnector(connector); + setupConfigService(); + + // When: + configService.startAsync().awaitRunning(); + + // Then: + awaitIteration.await(); + verify(connectorFactory, new Times(1)).apply(any()); + verify(pollingService).addConnector(connector); + configService.stopAsync().awaitTerminated(); + } + + private void givenConnectors(final String... names){ + for (final String name : names) { + when(connectClient.describe(name)).thenReturn(ConnectResponse.of(new ConnectorInfo( + name, + ImmutableMap.of(), + ImmutableList.of(), + ConnectorType.SOURCE + ))); + } + when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names))); + } + + private OngoingStubbing givenConnectorRecord(OngoingStubbing stubbing) { + return stubbing + .thenAnswer(inv -> new ConsumerRecords<>( + ImmutableMap.of( + new TopicPartition("topic", 0), + ImmutableList.of(new ConsumerRecord<>("topic", 0, 0L, "connector", new byte[]{}))))); + } + + private void givenNoMoreRecords(final OngoingStubbing stubbing) { + givenNoMoreRecords(stubbing, new CountDownLatch(1)); + } + + private void givenNoMoreRecords(final OngoingStubbing stubbing, final CountDownLatch noMoreLatch) { + final CountDownLatch awaitWakeup = new CountDownLatch(1); + stubbing.thenAnswer(invocationOnMock -> { + noMoreLatch.countDown(); + awaitWakeup.await(); + throw new WakeupException(); + }); + + doAnswer(invocation -> { + awaitWakeup.countDown(); + return null; + }).when(consumer).wakeup(); + } + + private void setupConfigService() { + configService = new ConnectConfigService( + new KsqlConfig(ImmutableMap.of()), + connectClient, + pollingService, + connectorFactory, + props -> consumer); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java new file mode 100644 index 000000000000..5e93bacb2821 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java @@ -0,0 +1,199 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.metastore.MutableMetaStore; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import io.confluent.ksql.parser.SqlFormatter; +import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.MetaStoreFixture; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ConnectPollingServiceTest { + + private final Node node = new Node(1, "localhost", 1234); + + @Mock + private FunctionRegistry functionRegistry; + @Mock + private KsqlExecutionContext executionContext; + @Mock + private ServiceContext serviceContext; + @Mock + private SchemaRegistryClient schemaRegistryClient; + + private MockAdminClient adminClient; + private ConnectPollingService pollingService; + private Set subjects; + private MutableMetaStore metaStore; + + @Before + public void setUp() throws Exception { + adminClient = new MockAdminClient(Collections.singletonList(node), node); + subjects = new HashSet<>(); + metaStore = MetaStoreFixture.getNewMetaStore(functionRegistry); + + when(executionContext.getMetaStore()).thenReturn(metaStore); + when(executionContext.getServiceContext()).thenReturn(serviceContext); + + when(serviceContext.getAdminClient()).thenReturn(adminClient); + when(serviceContext.getSchemaRegistryClient()).thenReturn(schemaRegistryClient); + + when(schemaRegistryClient.getAllSubjects()).thenReturn(subjects); + } + + @Test + public void shouldCreateSourceFromConnector() { + // Given: + final CreateSource[] ref = new CreateSource[]{null}; + givenTopic("foo"); + givenSubject("foo"); + givenPollingService(cs -> ref[0] = cs); + givenConnector("foo"); + + // When: + pollingService.runOneIteration(); + + // Then: + assertThat( + SqlFormatter.formatSql(ref[0]), + is("CREATE TABLE IF NOT EXISTS FOO " + + "WITH (" + + "KAFKA_TOPIC='foo', " + + "KEY='key', " + + "SOURCE_CONNECTOR='connector', " + + "VALUE_FORMAT='AVRO');")); + } + + @Test + public void shouldNotCreateSourceFromConnectorWithoutTopicMatch() { + // Given: + final CreateSource[] ref = new CreateSource[]{null}; + givenTopic("foo"); + givenSubject("foo"); + givenPollingService(cs -> ref[0] = cs); + givenConnector("bar"); + + // When: + pollingService.runOneIteration(); + + // Then: + assertThat(ref[0], nullValue()); + } + + @Test + public void shouldNotCreateSourceFromConnectorWithoutSubjectMatch() { + // Given: + final CreateSource[] ref = new CreateSource[]{null}; + givenTopic("foo"); + givenPollingService(cs -> ref[0] = cs); + givenConnector("foo"); + + // When: + pollingService.runOneIteration(); + + // Then: + assertThat(ref[0], nullValue()); + } + + @Test + public void shouldNotCreateSourceForAlreadyRegisteredSource() { + // Given: + final CreateSource[] ref = new CreateSource[]{null}; + givenTopic("foo"); + givenSubject("foo"); + givenPollingService(cs -> ref[0] = cs); + givenConnector("foo"); + + final DataSource source = mock(DataSource.class); + when(source.getName()).thenReturn("FOO"); + metaStore.putSource(source); + + // When: + pollingService.runOneIteration(); + + // Then: + assertThat(ref[0], nullValue()); + } + + @Test + public void shouldNotPollIfNoRegisteredConnectors() { + givenPollingService(foo -> {}); + + // When: + pollingService.runOneIteration(); + + // Then: + verifyZeroInteractions(serviceContext); + } + + private void givenTopic(final String topicName) { + adminClient.addTopic( + false, + topicName, + ImmutableList.of( + new TopicPartitionInfo(0, node, ImmutableList.of(), ImmutableList.of())), + ImmutableMap.of()); + } + + private void givenPollingService(final Consumer callback) { + pollingService = new ConnectPollingService(executionContext, callback); + } + + private void givenConnector(final String topic) { + pollingService.addConnector( + new Connector( + "connector", + foo -> Objects.equals(foo, topic), + foo -> topic, + DataSourceType.KTABLE, + "key" + )); + } + + private void givenSubject(final String topicName) { + subjects.add(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorTest.java new file mode 100644 index 000000000000..8d1095833c7b --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import com.google.common.testing.EqualsTester; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import org.junit.Test; + +public class ConnectorTest { + + @Test + public void shouldImplementHashAndEquals() { + new EqualsTester() + .addEqualityGroup( + new Connector("foo", foo -> true, foo -> foo, DataSourceType.KTABLE, "key"), + new Connector("foo", foo -> false, foo -> foo, DataSourceType.KTABLE, "key"), + new Connector("foo", foo -> false, foo -> foo, DataSourceType.KTABLE, "key") + ).addEqualityGroup( + new Connector("bar", foo -> true, foo -> foo, DataSourceType.KTABLE, "key") + ).addEqualityGroup( + new Connector("foo", foo -> true, foo -> foo, DataSourceType.KTABLE, "key2") + ).addEqualityGroup( + new Connector("foo", foo -> true, foo -> foo, DataSourceType.KSTREAM, "key") + ) + .testEquals(); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorsTest.java new file mode 100644 index 000000000000..35ddcf05485a --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectorsTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; +import java.util.Map; +import java.util.Optional; +import org.junit.Test; + +public class ConnectorsTest { + + @Test + public void shouldNotCreateConnectorForUnknown() { + // Given: + final Map config = ImmutableMap.of( + Connectors.CONNECTOR_CLASS, "foobar" + ); + + // When: + final Optional maybeConnector = Connectors.fromConnectInfo(config); + + // Then: + assertThat("expected no connector", !maybeConnector.isPresent()); + } + + @Test + public void shouldCreateJdbcConnectorWithValidConfigs() { + // Given: + final Map config = ImmutableMap.of( + Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS, + "name", "foo" + ); + + // When: + final Optional maybeConnector = Connectors.fromConnectInfo(config); + + // Then: + final Connector expected = new Connector( + "foo", + foo -> true, + foo -> foo, + DataSourceType.KTABLE, + null); + assertThat(maybeConnector, OptionalMatchers.of(is(expected))); + } + + @Test + public void shouldCreateJdbcConnectorWithValidPrefixTest() { + // Given: + final Map config = ImmutableMap.of( + Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS, + "name", "foo", + "topic.prefix", "foo-" + ); + + // When: + final Optional maybeConnector = Connectors.fromConnectInfo(config); + + // Then: + assertThat( + "expected match", + maybeConnector.map(connector -> connector.matches("foo-bar")).orElse(false)); + } + + @Test + public void shouldCreateJdbcConnectorWithValidMapToSource() { + // Given: + final Map config = ImmutableMap.of( + Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS, + "name", "name", + "topic.prefix", "foo-" + ); + + // When: + final Optional maybeConnector = Connectors.fromConnectInfo(config); + + // Then: + assertThat( + maybeConnector.map(connector -> connector.mapToSource("foo-bar")).orElse(null), + is("name_bar")); + } + + @Test + public void shouldCreateJdbcConnectorWithValidConfigsAndSMT() { + // Given: + final Map config = ImmutableMap.of( + Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS, + "name", "foo", + "transforms", "foobar,createKey", + "transforms.createKey.type", "org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.createKey.field", "key" + ); + + // When: + final Optional maybeConnector = Connectors.fromConnectInfo(config); + + // Then: + final Connector expected = new Connector( + "foo", + foo -> true, + foo -> foo, + DataSourceType.KTABLE, + "key"); + assertThat(maybeConnector, OptionalMatchers.of(is(expected))); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/KsqlConnectTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/KsqlConnectTest.java new file mode 100644 index 000000000000..e278a1aec913 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/KsqlConnectTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.connect; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KsqlConnectTest { + + @Mock + private ConnectPollingService pollingService; + @Mock + private ConnectConfigService configService; + + private KsqlConnect ksqlConnect; + + @Before + public void setUp() { + ksqlConnect = new KsqlConnect(pollingService, configService); + + when(pollingService.startAsync()).thenReturn(pollingService); + when(pollingService.stopAsync()).thenReturn(pollingService); + + when(configService.startAsync()).thenReturn(configService); + when(configService.stopAsync()).thenReturn(configService); + } + + @Test + public void shouldStartBothSubServicesAsynchronously() { + // When: + ksqlConnect.startAsync(); + + // Then: + verify(pollingService).startAsync(); + verify(configService).startAsync(); + + verifyNoMoreInteractions(pollingService, configService); + } + + @Test + public void shouldStopBothSubServicesSynchronously() { + // When: + ksqlConnect.close(); + + // Then: + verify(configService).stopAsync(); + verify(pollingService).stopAsync(); + verify(configService).awaitTerminated(); + verify(pollingService).awaitTerminated(); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java index 48f46321cb10..d95135608a2b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java @@ -28,6 +28,7 @@ import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import java.util.List; import org.apache.http.HttpStatus; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; @@ -95,4 +96,40 @@ public void testCreateWithError() throws JsonProcessingException { assertThat(response.error(), OptionalMatchers.of(is("Oh no!"))); } + @Test + public void testList() throws JsonProcessingException { + // Given: + WireMock.stubFor( + WireMock.get(WireMock.urlEqualTo("/connectors")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withBody(MAPPER.writeValueAsString(ImmutableList.of("one", "two")))) + ); + + // When: + final ConnectResponse> response = client.connectors(); + + // Then: + assertThat(response.datum(), OptionalMatchers.of(is(ImmutableList.of("one", "two")))); + assertThat("Expected no error!", !response.error().isPresent()); + } + + @Test + public void testDescribe() throws JsonProcessingException { + // Given: + WireMock.stubFor( + WireMock.get(WireMock.urlEqualTo("/connectors/foo")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withBody(MAPPER.writeValueAsString(SAMPLE_INFO))) + ); + + // When: + final ConnectResponse response = client.describe("foo"); + + // Then: + assertThat(response.datum(), OptionalMatchers.of(is(SAMPLE_INFO))); + assertThat("Expected no error!", !response.error().isPresent()); + } + } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index f971e2c294e8..d6e176ca45aa 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.confluent.ksql.ServiceInfo; +import io.confluent.ksql.connect.KsqlConnect; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.TopicAccessValidator; import io.confluent.ksql.engine.TopicAccessValidatorFactory; @@ -36,7 +37,9 @@ import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.SqlFormatter; import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; @@ -127,6 +130,7 @@ public final class KsqlRestApplication extends Application imple private final ServerState serverState; private final ProcessingLogContext processingLogContext; private final List preconditions; + private final KsqlConnect ksqlConnect; public static String getCommandsStreamName() { return COMMANDS_STREAM_NAME; @@ -150,7 +154,8 @@ public static String getCommandsStreamName() { final KsqlSecurityExtension securityExtension, final ServerState serverState, final ProcessingLogContext processingLogContext, - final List preconditions + final List preconditions, + final KsqlConnect ksqlConnect ) { super(config); this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); @@ -175,6 +180,8 @@ public static String getCommandsStreamName() { this.securityExtension = Objects.requireNonNull( securityExtension, "securityExtension" ); + this.ksqlConnect = Objects + .requireNonNull(ksqlConnect, "ksqlConnect"); } @Override @@ -193,6 +200,7 @@ public void start() throws Exception { super.start(); startKsql(); commandRunner.start(); + ksqlConnect.startAsync(); final Properties metricsProperties = new Properties(); metricsProperties.putAll(getConfiguration().getOriginals()); if (versionCheckerAgent != null) { @@ -264,6 +272,12 @@ private void initialize() { @Override public void stop() { + try { + ksqlConnect.close(); + } catch (final Exception e) { + log.error("Exception while waiting for ksqlConnect to close", e); + } + try { ksqlEngine.close(); } catch (final Exception e) { @@ -500,6 +514,15 @@ static KsqlRestApplication buildApplication( KsqlServerPrecondition.class ); + final KsqlConnect ksqlConnect = new KsqlConnect( + ksqlEngine, + ksqlConfig, + cs -> ksqlResource.handleKsqlStatements( + ksqlEngine.getServiceContext(), + new KsqlRequest(SqlFormatter.formatSql(cs), null, null) + ) + ); + return new KsqlRestApplication( serviceContext, ksqlEngine, @@ -516,7 +539,8 @@ static KsqlRestApplication buildApplication( securityExtension, serverState, processingLogContext, - preconditions + preconditions, + ksqlConnect ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 8d9a505a292e..f34ceeeaa762 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.connect.KsqlConnect; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -120,6 +121,8 @@ public class KsqlRestApplicationTest { @Mock private KsqlServerPrecondition precondition2; @Mock + private KsqlConnect ksqlConnect; + @Mock private ParsedStatement parsedStatement; @Mock private PreparedStatement preparedStatement; @@ -173,8 +176,8 @@ public void setUp() { securityExtension, serverState, processingLogContext, - ImmutableList.of(precondition1, precondition2) - ); + ImmutableList.of(precondition1, precondition2), + ksqlConnect); } @Test @@ -195,6 +198,15 @@ public void shouldCloseSecurityExtensionOnClose() { verify(securityExtension).close(); } + @Test + public void shouldCloseKsqlConnect() { + // When: + app.stop(); + + // Then: + verify(ksqlConnect).close(); + } + @Test public void shouldNotRegisterAuthorizationFilterWithoutAuthorizationProvider() { // Given: