diff --git a/docs/developer-guide/ksqldb-reference/drop-connector.md b/docs/developer-guide/ksqldb-reference/drop-connector.md index de01bff34908..6e4a3cc73f17 100644 --- a/docs/developer-guide/ksqldb-reference/drop-connector.md +++ b/docs/developer-guide/ksqldb-reference/drop-connector.md @@ -13,11 +13,14 @@ Synopsis -------- ```sql -DROP CONNECTOR connector_name; +DROP CONNECTOR [IF EXISTS] connector_name; ``` Description ----------- Drop a connector and delete it from the {{ site.kconnect }} cluster. The -topics associated with this cluster are not deleted by this command. \ No newline at end of file +topics associated with this cluster are not deleted by this command. + +If the IF EXISTS clause is present, the statement doesn't fail if the +connector doesn't exist. \ No newline at end of file diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index f3343d0850b2..cff317632c7b 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -42,6 +42,7 @@ import io.confluent.ksql.cli.console.table.builder.TablesListTableBuilder; import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder; import io.confluent.ksql.cli.console.table.builder.TypeListTableBuilder; +import io.confluent.ksql.cli.console.table.builder.WarningEntityTableBuilder; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.query.QueryError; import io.confluent.ksql.rest.ApiJsonMapper; @@ -81,6 +82,7 @@ import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.entity.TopicDescription; import io.confluent.ksql.rest.entity.TypeList; +import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.CmdLineUtil; import io.confluent.ksql.util.HandlerMaps; @@ -173,6 +175,8 @@ public class Console implements Closeable { tablePrinter(TypeList.class, TypeListTableBuilder::new)) .put(ErrorEntity.class, tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) + .put(WarningEntity.class, + tablePrinter(WarningEntity.class, WarningEntityTableBuilder::new)) .build(); private static Handler1 tablePrinter( diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/WarningEntityTableBuilder.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/WarningEntityTableBuilder.java new file mode 100644 index 000000000000..f3d5d12498e3 --- /dev/null +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/WarningEntityTableBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.cli.console.table.builder; + +import io.confluent.ksql.cli.console.table.Table; +import io.confluent.ksql.rest.entity.WarningEntity; + +public class WarningEntityTableBuilder implements TableBuilder { + @Override + public Table buildTable(final WarningEntity entity) { + return new Table.Builder() + .withColumnHeaders("Message") + .withRow(entity.getMessage()) + .build(); + } +} diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index e52cd52f7c8e..a61e99f9754d 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -77,7 +77,7 @@ statement | INSERT INTO sourceName (columns)? VALUES values #insertValues | DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable - | DROP CONNECTOR identifier #dropConnector + | DROP CONNECTOR (IF EXISTS)? identifier #dropConnector | EXPLAIN (statement | identifier) #explain | CREATE TYPE identifier AS type #registerType | DROP TYPE (IF EXISTS)? identifier #dropType diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 55bb9ee2b0ee..879c9e99aa1f 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -394,6 +394,7 @@ public Node visitDropStream(final SqlBaseParser.DropStreamContext context) { public Node visitDropConnector(final DropConnectorContext context) { return new DropConnector( getLocation(context), + context.EXISTS() != null, ParserUtil.getIdentifierText(context.identifier()) ); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DropConnector.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DropConnector.java index fdf366e3c22f..02988e8d9d5b 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DropConnector.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DropConnector.java @@ -15,19 +15,31 @@ package io.confluent.ksql.parser.tree; +import static com.google.common.base.MoreObjects.toStringHelper; + import io.confluent.ksql.parser.NodeLocation; import java.util.Objects; import java.util.Optional; public class DropConnector extends Statement { + private final boolean ifExists; private final String connectorName; - public DropConnector(final Optional location, final String connectorName) { + public DropConnector( + final Optional location, + final boolean ifExists, + final String connectorName + ) { super(location); + this.ifExists = ifExists; this.connectorName = connectorName; } + public boolean getIfExists() { + return ifExists; + } + public String getConnectorName() { return connectorName; } @@ -41,18 +53,19 @@ public boolean equals(final Object o) { return false; } final DropConnector that = (DropConnector) o; - return Objects.equals(connectorName, that.connectorName); + return Objects.equals(connectorName, that.connectorName) && ifExists == that.ifExists; } @Override public int hashCode() { - return Objects.hash(connectorName); + return Objects.hash(ifExists, connectorName); } @Override public String toString() { - return "DropConnector{" - + "connectorName='" + connectorName + '\'' - + '}'; + return toStringHelper(this) + .add("ifExists", ifExists) + .add("connectorName", connectorName) + .toString(); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java index 845726e97b5b..8ca4158ba5cb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java @@ -21,10 +21,12 @@ import io.confluent.ksql.rest.entity.DropConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.services.ConnectClient.ConnectResponse; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Optional; +import org.apache.hc.core5.http.HttpStatus; public final class DropConnectorExecutor { @@ -37,11 +39,17 @@ public static Optional execute( final ServiceContext serviceContext ) { final String connectorName = statement.getStatement().getConnectorName(); + final boolean ifExists = statement.getStatement().getIfExists(); final ConnectResponse response = serviceContext.getConnectClient().delete(connectorName); if (response.error().isPresent()) { - return Optional.of(new ErrorEntity(statement.getStatementText(), response.error().get())); + if (ifExists && response.httpCode() == HttpStatus.SC_NOT_FOUND) { + return Optional.of(new WarningEntity(statement.getStatementText(), + "Connector '" + connectorName + "' does not exist.")); + } else { + return Optional.of(new ErrorEntity(statement.getStatementText(), response.error().get())); + } } return Optional.of(new DropConnectorEntity(statement.getStatementText(), connectorName)); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java index ba9c77897387..2d9219191de8 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.rest.entity.DropConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.services.ConnectClient; import io.confluent.ksql.services.ConnectClient.ConnectResponse; import io.confluent.ksql.services.ServiceContext; @@ -48,15 +49,25 @@ public class DropConnectorExecutorTest { private static final KsqlConfig CONFIG = new KsqlConfig(ImmutableMap.of()); - private static final DropConnector CREATE_CONNECTOR = new DropConnector(Optional.empty(), "foo"); + private static final DropConnector DROP_CONNECTOR = + new DropConnector(Optional.empty(), false, "foo"); + private static final DropConnector DROP_CONNECTOR_IF_EXISTS = + new DropConnector(Optional.empty(), true, "foo"); private static final ConfiguredStatement DROP_CONNECTOR_CONFIGURED = ConfiguredStatement.of( PreparedStatement.of( "DROP CONNECTOR \"foo\"", - CREATE_CONNECTOR), + DROP_CONNECTOR), ImmutableMap.of(), CONFIG); + private static final ConfiguredStatement DROP_CONNECTOR_IF_EXISTS_CONFIGURED = + ConfiguredStatement.of( + PreparedStatement.of( + "DROP CONNECTOR \"foo\"", + DROP_CONNECTOR_IF_EXISTS), + ImmutableMap.of(), + CONFIG); @Mock private ServiceContext serviceContext; @@ -106,10 +117,28 @@ public void shouldReturnErrorEntityOnError() { // When: final Optional entity = DropConnectorExecutor .execute(DROP_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext); + final Optional entityIfExists = DropConnectorExecutor + .execute(DROP_CONNECTOR_IF_EXISTS_CONFIGURED, mock(SessionProperties.class), null, serviceContext); // Then: assertThat("Expected non-empty response", entity.isPresent()); assertThat(entity.get(), instanceOf(ErrorEntity.class)); + assertThat("Expected non-empty response", entityIfExists.isPresent()); + assertThat(entityIfExists.get(), instanceOf(ErrorEntity.class)); } + @Test + public void shouldReturnWarningIfNotExist() { + // Given: + when(connectClient.delete(anyString())) + .thenReturn(ConnectResponse.failure("Danger Mouse!", HttpStatus.SC_NOT_FOUND)); + + // When: + final Optional entity = DropConnectorExecutor + .execute(DROP_CONNECTOR_IF_EXISTS_CONFIGURED, mock(SessionProperties.class), null, serviceContext); + + // Then: + assertThat("Expected non-empty response", entity.isPresent()); + assertThat(entity.get(), instanceOf(WarningEntity.class)); + } } \ No newline at end of file diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index c3d8b67db3c9..7474a40ede4d 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -48,7 +48,8 @@ @JsonSubTypes.Type(value = ConnectorList.class, name = "connector_list"), @JsonSubTypes.Type(value = ConnectorDescription.class, name = "connector_description"), @JsonSubTypes.Type(value = TypeList.class, name = "type_list"), - @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity") + @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity"), + @JsonSubTypes.Type(value = WarningEntity.class, name = "warning_entity") }) public abstract class KsqlEntity { private final String statementText; diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/WarningEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/WarningEntity.java new file mode 100644 index 000000000000..161108f51f86 --- /dev/null +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/WarningEntity.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.errorprone.annotations.Immutable; + +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Immutable +public class WarningEntity extends KsqlEntity { + private final String message; + + @JsonCreator + public WarningEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("message") final String message + ) { + super(statementText); + this.message = Objects.requireNonNull(message, "message"); + } + + public String getMessage() { + return message; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final WarningEntity that = (WarningEntity) o; + return Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } + + @Override + public String toString() { + return "WarningEntity{" + + "message='" + message + '\'' + + '}'; + } +}