Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add connect templates and simplify JDBC source (MINOR) #3231

Merged
merged 2 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import io.confluent.ksql.connect.supported.Connectors;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -83,7 +84,7 @@ final class ConnectConfigService extends AbstractExecutionThreadService {
ksqlConfig,
connectClient,
pollingService,
Connectors::fromConnectInfo,
Connectors::from,
KafkaConsumer::new
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class Connector {
private final DataSourceType sourceType;
private final Optional<String> keyField;

Connector(
public Connector(
final String name,
final Predicate<String> isTopicMatch,
final Function<String, String> getSourceName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect.supported;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.connect.Connector;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

public enum Connectors implements SupportedConnector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this implement SupportConnector?


JDBC(JdbcSource.JDBC_SOURCE_CLASS, new JdbcSource())
;

public static final String CONNECTOR_CLASS = "connector.class";

private static final Map<String, SupportedConnector> CONNECTORS = ImmutableMap.copyOf(
EnumSet.allOf(Connectors.class)
.stream()
.collect(Collectors.toMap(
Connectors::getConnectorClass,
Function.identity()))
);

private final String connectorClass;
private final SupportedConnector supportedConnector;

Connectors(final String connectorClass, final SupportedConnector supportedConnector) {
this.connectorClass = Objects.requireNonNull(connectorClass, "connectorClass");
this.supportedConnector = Objects.requireNonNull(supportedConnector, "supportedConnector");
}

public static Optional<Connector> from(final ConnectorInfo info) {
final SupportedConnector connector =
CONNECTORS.get(info.config().get(CONNECTOR_CLASS));
return connector == null ? Optional.empty() : connector.fromConnectInfo(info);
}

public static Map<String, String> resolve(final Map<String, String> configs) {
final SupportedConnector connector =
CONNECTORS.get(configs.get(CONNECTOR_CLASS));
return connector == null ? configs : connector.resolveConfigs(configs);

}

@Override
public Optional<Connector> fromConnectInfo(final ConnectorInfo info) {
return supportedConnector.fromConnectInfo(info);
}

@Override
public Map<String, String> resolveConfigs(final Map<String, String> configs) {
return supportedConnector.resolveConfigs(configs);
}

public String getConnectorClass() {
return connectorClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,65 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect;
package io.confluent.ksql.connect.supported;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import io.confluent.ksql.connect.Connector;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.ValueToKey;

public final class Connectors {
public final class JdbcSource implements SupportedConnector {

static final String CONNECTOR_CLASS = "connector.class";
static final String JDBC_SOURCE_CLASS = "io.confluent.connect.jdbc.JdbcSourceConnector";

private Connectors() {
@Override
public Optional<Connector> fromConnectInfo(final ConnectorInfo info) {
final Map<String, String> properties = info.config();
return fromConfigs(properties);
}

public static Optional<Connector> fromConnectInfo(final ConnectorInfo connectorInfo) {
return fromConnectInfo(connectorInfo.config());
}

@SuppressWarnings("SwitchStatementWithTooFewBranches") // will soon expand to more
static Optional<Connector> fromConnectInfo(final Map<String, String> properties) {
final String clazz = properties.get(CONNECTOR_CLASS);
if (clazz == null) {
return Optional.empty();
}

switch (clazz) {
case JDBC_SOURCE_CLASS:
return Optional.of(jdbc(properties));
default:
return Optional.empty();
}
}

private static Connector jdbc(final Map<String, String> properties) {
@VisibleForTesting
Optional<Connector> fromConfigs(final Map<String, String> properties) {
final String name = properties.get("name");
final String prefix = properties.get("topic.prefix");

return new Connector(
return Optional.of(new Connector(
name,
topic -> topic.startsWith(prefix),
topic -> clean(name + "_" + topic.substring(prefix.length())),
DataSourceType.KTABLE,
extractKeyNameFromSMT(properties).orElse(null)
);
));
}

@Override
public Map<String, String> resolveConfigs(final Map<String, String> configs) {
final Map<String, String> resolved = new HashMap<>(configs);
final String key = resolved.remove("key");
if (key != null) {
resolved.merge(
"transforms",
"ksqlCreateKey,ksqlExtractString",
(a, b) -> String.join(",", a, b));

resolved.put("transforms.ksqlCreateKey.type", ValueToKey.class.getName());
resolved.put("transforms.ksqlCreateKey.fields", key);

resolved.put("transforms.ksqlExtractString.type", ExtractField.Key.class.getName());
resolved.put("transforms.ksqlExtractString.field", key);
}

resolved.putIfAbsent("key.converter", StringConverter.class.getName());
resolved.putIfAbsent("tasks.max", "1");
return resolved;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect.supported;

import io.confluent.ksql.connect.Connector;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

/**
* KSQL supports some "Blessed" connectors that we integrate well with. To be "blessed"
* means that we can automatically import topics created by these connectors and that
* we may provide templates that simplify configuration of these connectors.
*/
public interface SupportedConnector {

/**
* Constructs a {@link Connector} from the configuration given to us by connect.
*/
Optional<Connector> fromConnectInfo(ConnectorInfo info);

/**
* Resolves a template configuration into something that connect can understand.
*/
Map<String, String> resolveConfigs(Map<String, String> configs);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,33 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect;
package io.confluent.ksql.connect.supported;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.connect.Connector;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers;
import java.util.Map;
import java.util.Optional;
import org.junit.Test;

public class ConnectorsTest {
public class JdbcSourceTest {

@Test
public void shouldNotCreateConnectorForUnknown() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, "foobar"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);

// Then:
assertThat("expected no connector", !maybeConnector.isPresent());
}
private final JdbcSource jdbcSource = new JdbcSource();

@Test
public void shouldCreateJdbcConnectorWithValidConfigs() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
final Connector expected = new Connector(
Expand All @@ -66,13 +55,13 @@ public void shouldCreateJdbcConnectorWithValidConfigs() {
public void shouldCreateJdbcConnectorWithValidPrefixTest() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo",
"topic.prefix", "foo-"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
assertThat(
Expand All @@ -84,13 +73,13 @@ public void shouldCreateJdbcConnectorWithValidPrefixTest() {
public void shouldCreateJdbcConnectorWithValidMapToSource() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "name",
"topic.prefix", "foo-"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
assertThat(
Expand All @@ -102,15 +91,15 @@ public void shouldCreateJdbcConnectorWithValidMapToSource() {
public void shouldCreateJdbcConnectorWithValidConfigsAndSMT() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo",
"transforms", "foobar,createKey",
"transforms.createKey.type", "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.createKey.field", "key"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
final Connector expected = new Connector(
Expand All @@ -122,4 +111,31 @@ public void shouldCreateJdbcConnectorWithValidConfigsAndSMT() {
assertThat(maybeConnector, OptionalMatchers.of(is(expected)));
}

@Test
public void shouldResolveJdbcSourceConfigsTemplate() {
// Given:
final Map<String, String> originals = ImmutableMap.<String, String>builder()
.put(Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS)
.put("transforms", "foo")
.put("key", "id")
.build();

// When:
final Map<String, String> resolved = jdbcSource.resolveConfigs(originals);

// Then:
assertThat(
resolved,
is(ImmutableMap.<String, String>builder()
.put(Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS)
.put("transforms", "foo,ksqlCreateKey,ksqlExtractString")
.put("transforms.ksqlCreateKey.type", "org.apache.kafka.connect.transforms.ValueToKey")
.put("transforms.ksqlCreateKey.fields", "id")
.put("transforms.ksqlExtractString.type", "org.apache.kafka.connect.transforms.ExtractField$Key")
.put("transforms.ksqlExtractString.field", "id")
.put("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("tasks.max", "1")
.build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Maps;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.connect.supported.Connectors;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand All @@ -42,7 +43,10 @@ public static Optional<KsqlEntity> execute(

final ConnectResponse<ConnectorInfo> response = client.create(
createConnector.getName(),
Maps.transformValues(createConnector.getConfig(), l -> l.getValue().toString()));
Connectors.resolve(
Maps.transformValues(
createConnector.getConfig(),
l -> l != null ? l.getValue().toString() : null)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this ever be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never... it's just that intelliJ complains when I don't check for it because transformValues has an annotation (@NullableDecl)


if (response.datum().isPresent()) {
return Optional.of(
Expand Down
Loading