Skip to content

Commit

Permalink
feat: add 'show connector plugins' syntax (#7284)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Mar 29, 2021
1 parent f6dd212 commit be50d2d
Show file tree
Hide file tree
Showing 21 changed files with 597 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.cli.console.table.builder.CommandStatusTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorInfoTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorPluginsListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.DropConnectorTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder;
Expand All @@ -53,6 +54,7 @@
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.ConnectorPluginsList;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand Down Expand Up @@ -176,6 +178,8 @@ public class Console implements Closeable {
tablePrinter(DropConnectorEntity.class, DropConnectorTableBuilder::new))
.put(ConnectorList.class,
tablePrinter(ConnectorList.class, ConnectorListTableBuilder::new))
.put(ConnectorPluginsList.class,
tablePrinter(ConnectorPluginsList.class, ConnectorPluginsListTableBuilder::new))
.put(ConnectorDescription.class,
Console::printConnectorDescription)
.put(TypeList.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.ConnectorPluginsList;
import java.util.List;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;

public class ConnectorPluginsListTableBuilder implements TableBuilder<ConnectorPluginsList> {
private static final List<String> HEADERS = ImmutableList.of(
"Class", "Type", "Version"
);

@Override
public Table buildTable(final ConnectorPluginsList entity) {
return new Table.Builder()
.withColumnHeaders(HEADERS)
.withRows(entity.getConnectorsPlugins()
.stream()
.map(info -> ImmutableList.of(
info.getClassName(),
ObjectUtils.defaultIfNull(info.getType(), ConnectorType.UNKNOWN).name(),
ObjectUtils.defaultIfNull(info.getVersion(), ""))))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.ConnectorPluginsList;
import io.confluent.ksql.rest.entity.ConsumerPartitionOffsets;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand All @@ -76,6 +77,7 @@
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorPluginInfo;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand Down Expand Up @@ -586,6 +588,50 @@ public void shouldPrintTablesList() {
Approvals.verify(output, approvalOptions);
}

@Test
public void shouldPrintConnectorPluginsList() {
// Given:
final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of(
new ConnectorPluginsList(
"statement",
ImmutableList.of(),
ImmutableList.of(
new SimpleConnectorPluginInfo("clazz1", ConnectorType.SOURCE, "v1"),
new SimpleConnectorPluginInfo("clazz2", ConnectorType.SINK, "v2")
))
));

// When:
console.printKsqlEntityList(entities);

// Then:
final String output = terminal.getOutputString();
if (console.getOutputFormat() == OutputFormat.JSON) {
assertThat(output, is(""
+ "[ {" + NEWLINE
+ " \"@type\" : \"connector_plugins_list\"," + NEWLINE
+ " \"statementText\" : \"statement\"," + NEWLINE
+ " \"warnings\" : [ ]," + NEWLINE
+ " \"connectorsPlugins\" : [ {" + NEWLINE
+ " \"className\" : \"clazz1\"," + NEWLINE
+ " \"type\" : \"source\"," + NEWLINE
+ " \"version\" : \"v1\"" + NEWLINE
+ " }, {" + NEWLINE
+ " \"className\" : \"clazz2\"," + NEWLINE
+ " \"type\" : \"sink\"," + NEWLINE
+ " \"version\" : \"v2\"" + NEWLINE
+ " } ]" + NEWLINE
+ "} ]" + NEWLINE));
} else {
assertThat(output, is("" + NEWLINE
+ " Class | Type | Version " + NEWLINE
+ "---------------------------" + NEWLINE
+ " clazz1 | SOURCE | v1 " + NEWLINE
+ " clazz2 | SINK | v2 " + NEWLINE
+ "---------------------------" + NEWLINE));
}
}

@Test
public void shouldPrintConnectorsList() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.Timeout;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +60,7 @@ public class DefaultConnectClient implements ConnectClient {

private static final ObjectMapper MAPPER = ConnectJsonMapper.INSTANCE.get();

private static final String CONNECTOR_PLUGINS = "/connector-plugins";
private static final String CONNECTORS = "/connectors";
private static final String STATUS = "/status";
private static final String TOPICS = "/topics";
Expand Down Expand Up @@ -146,6 +148,30 @@ public ConnectResponse<List<String>> connectors() {
}
}

@Override
public ConnectResponse<List<ConnectorPluginInfo>> connectorPlugins() {
try {
LOG.debug("Issuing request to Kafka Connect at URI {} to list connector plugins", connectUri);

final ConnectResponse<List<ConnectorPluginInfo>> connectResponse = withRetries(() -> Request
.get(connectUri.resolve(CONNECTOR_PLUGINS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<List<ConnectorPluginInfo>>() {},
Function.identity())));

connectResponse.error()
.ifPresent(error -> LOG.warn("Could not list connector plugins: {}.", error));

return connectResponse;
} catch (final Exception e) {
throw new KsqlServerException(e);
}
}

@Override
public ConnectResponse<ConnectorStateInfo> status(final String connector) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
Expand All @@ -62,6 +63,11 @@ public class DefaultConnectClientTest {
),
ConnectorType.SOURCE
);
private static final ConnectorPluginInfo SAMPLE_PLUGIN = new ConnectorPluginInfo(
"io.confluent.connect.replicator.ReplicatorSourceConnector",
ConnectorType.SOURCE,
"1.0"
);
private static final String AUTH_HEADER = "Basic FOOBAR";

private static final Map<String, ActiveTopicsInfo> SAMPLE_TOPICS = ImmutableMap.of(
Expand Down Expand Up @@ -143,6 +149,25 @@ public void testList() throws JsonProcessingException {
assertThat("Expected no error!", !response.error().isPresent());
}

@Test
public void testListPlugins() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.get(WireMock.urlEqualTo("/connector-plugins"))
.withHeader(AUTHORIZATION.toString(), new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withBody(MAPPER.writeValueAsString(ImmutableList.of(SAMPLE_PLUGIN))))
);

// When:
final ConnectResponse<List<ConnectorPluginInfo>> response = client.connectorPlugins();

// Then:
assertThat(response.datum(), OptionalMatchers.of(is(ImmutableList.of(SAMPLE_PLUGIN))));
assertThat("Expected no error!", !response.error().isPresent());
}

@Test
public void testDescribe() throws JsonProcessingException {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;

/**
Expand All @@ -35,6 +36,13 @@ public interface ConnectClient {
*/
ConnectResponse<List<String>> connectors();

/**
* List all of the connector plugins available in this connect cluster.
*
* @return a list of connector plugins
*/
ConnectResponse<List<ConnectorPluginInfo>> connectorPlugins();

/**
* Gets the configuration for a specified connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ statement
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
| (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors
| (LIST | SHOW) CONNECTOR PLUGINS #listConnectorPlugins
| (LIST | SHOW) TYPES #listTypes
| (LIST | SHOW) VARIABLES #listVariables
| DESCRIBE sourceName EXTENDED? #showColumns
Expand Down Expand Up @@ -536,6 +537,7 @@ ASSERT: 'ASSERT';
ADD: 'ADD';
ALTER: 'ALTER';
VARIABLES: 'VARIABLES';
PLUGINS: 'PLUGINS';

IF: 'IF';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import io.confluent.ksql.parser.tree.JoinCriteria;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.JoinedSource;
import io.confluent.ksql.parser.tree.ListConnectorPlugins;
import io.confluent.ksql.parser.tree.ListConnectors;
import io.confluent.ksql.parser.tree.ListConnectors.Scope;
import io.confluent.ksql.parser.tree.ListFunctions;
Expand Down Expand Up @@ -702,6 +703,11 @@ public Node visitListConnectors(final ListConnectorsContext ctx) {
return new ListConnectors(getLocation(ctx), scope);
}

@Override
public Node visitListConnectorPlugins(final SqlBaseParser.ListConnectorPluginsContext ctx) {
return new ListConnectorPlugins((getLocation(ctx)));
}

@Override
public Node visitDropType(final DropTypeContext ctx) {
return new DropType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.parser.tree.JoinCriteria;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.JoinedSource;
import io.confluent.ksql.parser.tree.ListConnectorPlugins;
import io.confluent.ksql.parser.tree.ListFunctions;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
Expand Down Expand Up @@ -438,6 +439,13 @@ protected Void visitListTables(final ListTables node, final Integer context) {
return null;
}

@Override
protected Void visitListConnectorPlugins(
final ListConnectorPlugins node, final Integer context) {
builder.append("SHOW CONNECTOR PLUGINS");
return null;
}

@Override
protected Void visitDescribeStreams(final DescribeStreams node, final Integer context) {
builder.append("DESCRIBE STREAMS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ protected R visitListTypes(final ListTypes listTypes, final C context) {
return visitStatement(listTypes, context);
}

protected R visitListConnectorPlugins(final ListConnectorPlugins node, final C context) {
return visitStatement(node, context);
}

protected R visitUnsetProperty(final UnsetProperty node, final C context) {
return visitStatement(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import static com.google.common.base.MoreObjects.toStringHelper;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Optional;

@Immutable
public class ListConnectorPlugins extends Statement {
public ListConnectorPlugins(final Optional<NodeLocation> location) {
super(location);
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitListConnectorPlugins(this, context);
}

@Override
public int hashCode() {
return 0;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
return (o != null) && (getClass() == o.getClass());
}

@Override
public String toString() {
return toStringHelper(this).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.confluent.ksql.parser.tree.JoinCriteria;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.JoinedSource;
import io.confluent.ksql.parser.tree.ListConnectorPlugins;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
import io.confluent.ksql.parser.tree.ListVariables;
Expand Down Expand Up @@ -807,6 +808,18 @@ public void shouldFormatShowTablesExtended() {
assertThat(formatted, is("SHOW TABLES EXTENDED"));
}

@Test
public void shouldFormatShowConnectorPlugins() {
// Given:
final ListConnectorPlugins listConnectorPlugins = new ListConnectorPlugins(Optional.empty());

// When:
final String formatted = SqlFormatter.formatSql(listConnectorPlugins);

// Then:
assertThat(formatted, is("SHOW CONNECTOR PLUGINS"));
}

@Test
public void shouldFormatDescribeTables() {
// Given:
Expand Down
Loading

0 comments on commit be50d2d

Please sign in to comment.