Skip to content

Commit

Permalink
feat: add connect templates and simplify JDBC source (MINOR) (#3231)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Aug 19, 2019
1 parent bc1a2f8 commit ba0fb99
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import io.confluent.ksql.connect.supported.Connectors;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -83,7 +84,7 @@ final class ConnectConfigService extends AbstractExecutionThreadService {
ksqlConfig,
connectClient,
pollingService,
Connectors::fromConnectInfo,
Connectors::from,
KafkaConsumer::new
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class Connector {
private final DataSourceType sourceType;
private final Optional<String> keyField;

Connector(
public Connector(
final String name,
final Predicate<String> isTopicMatch,
final Function<String, String> getSourceName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect.supported;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.connect.Connector;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

public enum Connectors implements SupportedConnector {

JDBC(JdbcSource.JDBC_SOURCE_CLASS, new JdbcSource())
;

public static final String CONNECTOR_CLASS = "connector.class";

private static final Map<String, SupportedConnector> CONNECTORS = ImmutableMap.copyOf(
EnumSet.allOf(Connectors.class)
.stream()
.collect(Collectors.toMap(
Connectors::getConnectorClass,
Function.identity()))
);

private final String connectorClass;
private final SupportedConnector supportedConnector;

Connectors(final String connectorClass, final SupportedConnector supportedConnector) {
this.connectorClass = Objects.requireNonNull(connectorClass, "connectorClass");
this.supportedConnector = Objects.requireNonNull(supportedConnector, "supportedConnector");
}

public static Optional<Connector> from(final ConnectorInfo info) {
final SupportedConnector connector =
CONNECTORS.get(info.config().get(CONNECTOR_CLASS));
return connector == null ? Optional.empty() : connector.fromConnectInfo(info);
}

public static Map<String, String> resolve(final Map<String, String> configs) {
final SupportedConnector connector =
CONNECTORS.get(configs.get(CONNECTOR_CLASS));
return connector == null ? configs : connector.resolveConfigs(configs);

}

@Override
public Optional<Connector> fromConnectInfo(final ConnectorInfo info) {
return supportedConnector.fromConnectInfo(info);
}

@Override
public Map<String, String> resolveConfigs(final Map<String, String> configs) {
return supportedConnector.resolveConfigs(configs);
}

public String getConnectorClass() {
return connectorClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,65 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect;
package io.confluent.ksql.connect.supported;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import io.confluent.ksql.connect.Connector;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.ValueToKey;

public final class Connectors {
public final class JdbcSource implements SupportedConnector {

static final String CONNECTOR_CLASS = "connector.class";
static final String JDBC_SOURCE_CLASS = "io.confluent.connect.jdbc.JdbcSourceConnector";

private Connectors() {
@Override
public Optional<Connector> fromConnectInfo(final ConnectorInfo info) {
final Map<String, String> properties = info.config();
return fromConfigs(properties);
}

public static Optional<Connector> fromConnectInfo(final ConnectorInfo connectorInfo) {
return fromConnectInfo(connectorInfo.config());
}

@SuppressWarnings("SwitchStatementWithTooFewBranches") // will soon expand to more
static Optional<Connector> fromConnectInfo(final Map<String, String> properties) {
final String clazz = properties.get(CONNECTOR_CLASS);
if (clazz == null) {
return Optional.empty();
}

switch (clazz) {
case JDBC_SOURCE_CLASS:
return Optional.of(jdbc(properties));
default:
return Optional.empty();
}
}

private static Connector jdbc(final Map<String, String> properties) {
@VisibleForTesting
Optional<Connector> fromConfigs(final Map<String, String> properties) {
final String name = properties.get("name");
final String prefix = properties.get("topic.prefix");

return new Connector(
return Optional.of(new Connector(
name,
topic -> topic.startsWith(prefix),
topic -> clean(name + "_" + topic.substring(prefix.length())),
DataSourceType.KTABLE,
extractKeyNameFromSMT(properties).orElse(null)
);
));
}

@Override
public Map<String, String> resolveConfigs(final Map<String, String> configs) {
final Map<String, String> resolved = new HashMap<>(configs);
final String key = resolved.remove("key");
if (key != null) {
resolved.merge(
"transforms",
"ksqlCreateKey,ksqlExtractString",
(a, b) -> String.join(",", a, b));

resolved.put("transforms.ksqlCreateKey.type", ValueToKey.class.getName());
resolved.put("transforms.ksqlCreateKey.fields", key);

resolved.put("transforms.ksqlExtractString.type", ExtractField.Key.class.getName());
resolved.put("transforms.ksqlExtractString.field", key);
}

resolved.putIfAbsent("key.converter", StringConverter.class.getName());
resolved.putIfAbsent("tasks.max", "1");
return resolved;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect.supported;

import io.confluent.ksql.connect.Connector;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

/**
* KSQL supports some "Blessed" connectors that we integrate well with. To be "blessed"
* means that we can automatically import topics created by these connectors and that
* we may provide templates that simplify configuration of these connectors.
*/
public interface SupportedConnector {

/**
* Constructs a {@link Connector} from the configuration given to us by connect.
*/
Optional<Connector> fromConnectInfo(ConnectorInfo info);

/**
* Resolves a template configuration into something that connect can understand.
*/
Map<String, String> resolveConfigs(Map<String, String> configs);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,33 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.connect;
package io.confluent.ksql.connect.supported;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.connect.Connector;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers;
import java.util.Map;
import java.util.Optional;
import org.junit.Test;

public class ConnectorsTest {
public class JdbcSourceTest {

@Test
public void shouldNotCreateConnectorForUnknown() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, "foobar"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);

// Then:
assertThat("expected no connector", !maybeConnector.isPresent());
}
private final JdbcSource jdbcSource = new JdbcSource();

@Test
public void shouldCreateJdbcConnectorWithValidConfigs() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
final Connector expected = new Connector(
Expand All @@ -66,13 +55,13 @@ public void shouldCreateJdbcConnectorWithValidConfigs() {
public void shouldCreateJdbcConnectorWithValidPrefixTest() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo",
"topic.prefix", "foo-"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
assertThat(
Expand All @@ -84,13 +73,13 @@ public void shouldCreateJdbcConnectorWithValidPrefixTest() {
public void shouldCreateJdbcConnectorWithValidMapToSource() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "name",
"topic.prefix", "foo-"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
assertThat(
Expand All @@ -102,15 +91,15 @@ public void shouldCreateJdbcConnectorWithValidMapToSource() {
public void shouldCreateJdbcConnectorWithValidConfigsAndSMT() {
// Given:
final Map<String, String> config = ImmutableMap.of(
Connectors.CONNECTOR_CLASS, Connectors.JDBC_SOURCE_CLASS,
Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS,
"name", "foo",
"transforms", "foobar,createKey",
"transforms.createKey.type", "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.createKey.field", "key"
);

// When:
final Optional<Connector> maybeConnector = Connectors.fromConnectInfo(config);
final Optional<Connector> maybeConnector = jdbcSource.fromConfigs(config);

// Then:
final Connector expected = new Connector(
Expand All @@ -122,4 +111,31 @@ public void shouldCreateJdbcConnectorWithValidConfigsAndSMT() {
assertThat(maybeConnector, OptionalMatchers.of(is(expected)));
}

@Test
public void shouldResolveJdbcSourceConfigsTemplate() {
// Given:
final Map<String, String> originals = ImmutableMap.<String, String>builder()
.put(Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS)
.put("transforms", "foo")
.put("key", "id")
.build();

// When:
final Map<String, String> resolved = jdbcSource.resolveConfigs(originals);

// Then:
assertThat(
resolved,
is(ImmutableMap.<String, String>builder()
.put(Connectors.CONNECTOR_CLASS, JdbcSource.JDBC_SOURCE_CLASS)
.put("transforms", "foo,ksqlCreateKey,ksqlExtractString")
.put("transforms.ksqlCreateKey.type", "org.apache.kafka.connect.transforms.ValueToKey")
.put("transforms.ksqlCreateKey.fields", "id")
.put("transforms.ksqlExtractString.type", "org.apache.kafka.connect.transforms.ExtractField$Key")
.put("transforms.ksqlExtractString.field", "id")
.put("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("tasks.max", "1")
.build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Maps;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.connect.supported.Connectors;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand All @@ -42,7 +43,10 @@ public static Optional<KsqlEntity> execute(

final ConnectResponse<ConnectorInfo> response = client.create(
createConnector.getName(),
Maps.transformValues(createConnector.getConfig(), l -> l.getValue().toString()));
Connectors.resolve(
Maps.transformValues(
createConnector.getConfig(),
l -> l != null ? l.getValue().toString() : null)));

if (response.datum().isPresent()) {
return Optional.of(
Expand Down
Loading

0 comments on commit ba0fb99

Please sign in to comment.