Skip to content

Commit

Permalink
feat: New ksql.properties.overrides.denylist to deny clients configs …
Browse files Browse the repository at this point in the history
…overrides (#5877)
  • Loading branch information
spena authored Aug 6, 2020
1 parent dde07b2 commit 7d1ad25
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.properties;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.confluent.ksql.util.KsqlException;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Class that validates if a property, or list of properties, is part of a list of denied
* properties.
*/
public class DenyListPropertyValidator {
private final Set<String> immutableProps;

public DenyListPropertyValidator(final Collection<String> immutableProps) {
this.immutableProps = ImmutableSet.copyOf(
Objects.requireNonNull(immutableProps, "immutableProps"));
}

/**
* Validates if a list of properties are part of the list of denied properties.
* @throws if at least one property is part of the denied list.
*/
public void validateAll(final Map<String, Object> properties) {
final Set<String> propsDenied = Sets.intersection(immutableProps, properties.keySet());
if (!propsDenied.isEmpty()) {
throw new KsqlException(String.format("One or more properties overrides set locally are "
+ "prohibited by the KSQL server (use UNSET to reset their default value): %s",
propsDenied));
}
}
}
12 changes: 12 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ public class KsqlConfig extends AbstractConfig {
+ "error messages (per query) to hold in the internal query errors queue and display"
+ "in the query description when executing the `EXPLAIN <query>` command.";

public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST =
"ksql.properties.overrides.denylist";
private static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of "
+ "properties that KSQL users cannot override.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -762,6 +767,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DOC
)
.define(
KSQL_PROPERTIES_OVERRIDES_DENYLIST,
Type.LIST,
"",
Importance.LOW,
KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.properties;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlException;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;

public class DenyListPropertyValidatorTest {
private DenyListPropertyValidator validator;

@Before
public void setUp() {
validator = new DenyListPropertyValidator(Arrays.asList(
"immutable-property-1",
"immutable-property-2"
));
}

@Test
public void shouldThrowOnDenyListedProperty() {
// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> validator.validateAll(ImmutableMap.of(
"immutable-property-1", "v1",
"anything", "v2",
"immutable-property-2", "v3"
))
);

// Then:
assertThat(e.getMessage(), containsString(
"One or more properties overrides set locally are prohibited by the KSQL server "
+ "(use UNSET to reset their default value): "
+ "[immutable-property-1, immutable-property-2]"
));

}

@Test
public void shouldNotThrowOnAllowedProp() {
validator.validateAll(ImmutableMap.of(
"mutable-1", "v1",
"anything", "v2"
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
Expand Down Expand Up @@ -183,6 +184,7 @@ public final class KsqlRestApplication implements Executable {
private Server apiServer = null;
private final CompletableFuture<Void> terminatedFuture = new CompletableFuture<>();
private final QueryMonitor queryMonitor;
private final DenyListPropertyValidator denyListPropertyValidator;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -218,7 +220,8 @@ public static SourceName getCommandsStreamName() {
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx,
final QueryMonitor ksqlQueryMonitor
final QueryMonitor ksqlQueryMonitor,
final DenyListPropertyValidator denyListPropertyValidator
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand All @@ -245,6 +248,8 @@ public static SourceName getCommandsStreamName() {
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = requireNonNull(vertx, "vertx");
this.denyListPropertyValidator =
requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
if (heartbeatAgent.isPresent()) {
Expand Down Expand Up @@ -305,7 +310,8 @@ public void startAsync() {
KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);

startAsyncThreadRef.set(Thread.currentThread());
Expand Down Expand Up @@ -695,6 +701,9 @@ static KsqlRestApplication buildApplication(
final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
ksqlEngine, routingFilterFactory, ksqlConfig);

final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
commandStore,
Expand All @@ -704,7 +713,8 @@ static KsqlRestApplication buildApplication(
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);

final KsqlResource ksqlResource = new KsqlResource(
Expand All @@ -713,7 +723,8 @@ static KsqlRestApplication buildApplication(
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler
errorHandler,
denyListPropertyValidator
);

final List<String> managedTopics = new LinkedList<>();
Expand Down Expand Up @@ -773,7 +784,8 @@ static KsqlRestApplication buildApplication(
heartbeatAgent,
lagReportingAgent,
vertx,
queryMonitor
queryMonitor,
denyListPropertyValidator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.SessionProperties;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class KsqlResource implements KsqlConfigurable {
private final ActivenessRegistrar activenessRegistrar;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final DenyListPropertyValidator denyListPropertyValidator;
private RequestValidator validator;
private RequestHandler handler;
private final Errors errorHandler;
Expand All @@ -108,7 +111,8 @@ public KsqlResource(
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
final Errors errorHandler,
final DenyListPropertyValidator denyListPropertyValidator
) {
this(
ksqlEngine,
Expand All @@ -117,7 +121,8 @@ public KsqlResource(
activenessRegistrar,
Injectors.DEFAULT,
authorizationValidator,
errorHandler
errorHandler,
denyListPropertyValidator
);
}

Expand All @@ -128,7 +133,8 @@ public KsqlResource(
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
final Errors errorHandler,
final DenyListPropertyValidator denyListPropertyValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand All @@ -140,6 +146,8 @@ public KsqlResource(
this.authorizationValidator = Objects
.requireNonNull(authorizationValidator, "authorizationValidator");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.denyListPropertyValidator =
Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
}

@Override
Expand Down Expand Up @@ -199,11 +207,14 @@ public EndpointResponse terminateCluster(

ensureValidPatterns(request.getDeleteTopicList());
try {
final Map<String, Object> streamsProperties = request.getStreamsProperties();
denyListPropertyValidator.validateAll(streamsProperties);

final KsqlEntityList entities = handler.execute(
securityContext,
TERMINATE_CLUSTER,
new SessionProperties(
request.getStreamsProperties(),
streamsProperties,
localHost,
localUrl,
false
Expand Down Expand Up @@ -232,14 +243,18 @@ public EndpointResponse handleKsqlStatements(
request,
distributedCmdResponseTimeout);

final Map<String, Object> configProperties = request.getConfigOverrides();
denyListPropertyValidator.validateAll(configProperties);

final KsqlRequestConfig requestConfig =
new KsqlRequestConfig(request.getRequestProperties());
final List<ParsedStatement> statements = ksqlEngine.parse(request.getKsql());

validator.validate(
SandboxedServiceContext.create(securityContext.getServiceContext()),
statements,
new SessionProperties(
request.getConfigOverrides(),
configProperties,
localHost,
localUrl,
requestConfig.getBoolean(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST)
Expand All @@ -251,7 +266,7 @@ public EndpointResponse handleKsqlStatements(
securityContext,
statements,
new SessionProperties(
request.getConfigOverrides(),
configProperties,
localHost,
localUrl,
requestConfig.getBoolean(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST)
Expand Down
Loading

0 comments on commit 7d1ad25

Please sign in to comment.