Skip to content

Commit

Permalink
feat: Security plumbing (confluentinc#4778)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Mar 16, 2020
1 parent fba15be commit 395626c
Show file tree
Hide file tree
Showing 25 changed files with 492 additions and 229 deletions.
11 changes: 11 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_SECURITY_EXTENSION_DOC = "A KSQL security extension class that "
+ "provides authorization to KSQL servers.";

public static final String KSQL_AUTHENTICATION_PLUGIN_CLASS = "ksql.authentication.plugin.class";
public static final String KSQL_AUTHENTICATION_PLUGIN_DEFAULT = null;
public static final String KSQL_AUTHENTICATION_PLUGIN_DOC = "An extension class that allows "
+ " custom authentication to be plugged in.";

public static final String KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR = "ksql.access.validator.enable";
public static final String KSQL_ACCESS_VALIDATOR_ON = "on";
public static final String KSQL_ACCESS_VALIDATOR_OFF = "off";
Expand Down Expand Up @@ -489,6 +494,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
).define(
KSQL_AUTHENTICATION_PLUGIN_CLASS,
Type.CLASS,
KSQL_AUTHENTICATION_PLUGIN_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_AUTHENTICATION_PLUGIN_DOC
).define(
KSQL_WRAP_SINGLE_VALUES,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.auth;

import io.vertx.core.WorkerExecutor;
import io.vertx.ext.web.RoutingContext;
import java.security.Principal;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Extension point for adding custom authentication.
*/
public interface AuthenticationPlugin {

void configure(Map<String, ?> map);

/**
* Handle authentication for the request. Please note that in the case of failure to authenticate
* the plugin should end the response. This behaviour makes it compatible with existing auth
* plugins which it wraps.
*
* @param routingContext The routing context
* @param workerExecutor The worker executor
* @return A CompletableFuture representing the result of the authentication
*/
CompletableFuture<Principal> handleAuth(RoutingContext routingContext,
WorkerExecutor workerExecutor);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ public Optional<String> getAuthToken() {
return authToken;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,10 @@ private void getUser(
return;
}

if (!validateRoles(lc, allowedRoles)) {
log.error("Failed to log in: Invalid roles.");
promise.fail("Failed to log in: Invalid roles.");
return;
}
// We do the actual authorization here not in the User class
final boolean authorized = validateRoles(lc, allowedRoles);

promise.complete(new JaasUser(username, this));
}

private void checkUserPermission(
final String username,
final Handler<AsyncResult<Boolean>> resultHandler
) {
// no authorization yet (besides JAAS role check during login)
// consequently, authenticated users have all permissions
resultHandler.handle(Future.succeededFuture(true));
promise.complete(new JaasUser(username, authorized));
}

private static boolean validateRoles(final LoginContext lc, final List<String> allowedRoles) {
Expand All @@ -152,37 +140,32 @@ private static boolean validateRoles(final LoginContext lc, final List<String> a
static class JaasUser extends io.vertx.ext.auth.AbstractUser {

private final String username;
private JaasAuthProvider authProvider;
private JsonObject principal;
private boolean authorized;

JaasUser(final String username, final JaasAuthProvider authProvider) {
JaasUser(final String username, final boolean authorized) {
this.username = Objects.requireNonNull(username, "username");
this.authProvider = Objects.requireNonNull(authProvider, "authProvider");
this.authorized = authorized;
}

@Override
public void doIsPermitted(
final String permission,
final Handler<AsyncResult<Boolean>> resultHandler
) {
authProvider.checkUserPermission(username, resultHandler);
resultHandler.handle(Future.succeededFuture(authorized));
}

@Override
public JsonObject principal() {
if (principal == null) {
principal = new JsonObject().put("username", username);
principal = new JsonObject().put("username", username).put("authorized", authorized);
}
return principal;
}

@Override
public void setAuthProvider(final AuthProvider authProvider) {
if (authProvider instanceof JaasAuthProvider) {
this.authProvider = (JaasAuthProvider)authProvider;
} else {
throw new IllegalArgumentException("Not a JaasAuthProvider");
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.auth;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
import java.security.Principal;
import java.util.Set;

public class KsqlAuthorizationProviderHandler implements Handler<RoutingContext> {

private static final Set<String> PATHS_WITHOUT_AUTHORIZATION = ImmutableSet
.of("/v1/metadata", "/healthcheck");

private final WorkerExecutor workerExecutor;
private final KsqlAuthorizationProvider ksqlAuthorizationProvider;

public KsqlAuthorizationProviderHandler(final WorkerExecutor workerExecutor,
final KsqlAuthorizationProvider ksqlAuthorizationProvider) {
this.workerExecutor = workerExecutor;
this.ksqlAuthorizationProvider = ksqlAuthorizationProvider;
}

@Override
public void handle(final RoutingContext routingContext) {

final String path = routingContext.normalisedPath();

if (PATHS_WITHOUT_AUTHORIZATION.contains(path)) {
routingContext.next();
return;
}

workerExecutor.<Void>executeBlocking(
promise -> authorize(promise, routingContext),
ar -> handleAuthorizeResult(ar, routingContext));
}

private static void handleAuthorizeResult(final AsyncResult<Void> ar,
final RoutingContext routingContext) {
if (ar.succeeded()) {
routingContext.next();
} else {
routingContext.fail(403, ar.cause());
}
}

private void authorize(final Promise<Void> promise, final RoutingContext routingContext) {
final User user = routingContext.user();
if (user == null) {
promise.fail(
new IllegalStateException("Null user in " + KsqlAuthorizationProviderHandler.class));
return;
}
final Principal principal = new ApiPrincipal(user.principal().getString("username"));
try {
ksqlAuthorizationProvider
.checkEndpointAccess(principal, routingContext.request().method().toString(),
routingContext.normalisedPath());
} catch (Exception e) {
promise.fail(e);
return;
}
promise.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class KsqlServerEndpoints implements Endpoints {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
Expand All @@ -52,28 +56,36 @@ public KsqlServerEndpoints(
}

@Override
public QueryPublisher createQueryPublisher(final String sql, final JsonObject properties,
public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
final JsonObject properties,
final Context context,
final WorkerExecutor workerExecutor,
final ApiSecurityContext apiSecurityContext) {
return new QueryStreamEndpoint(ksqlEngine, ksqlConfig, pullQueryExecutor)
.createQueryPublisher(sql, properties, context, workerExecutor,
createServiceContext(apiSecurityContext));
return executeOnWorker(
() -> new QueryStreamEndpoint(ksqlEngine, ksqlConfig, pullQueryExecutor)
.createQueryPublisher(sql, properties, context, workerExecutor,
ksqlSecurityContextProvider.provide(apiSecurityContext).getServiceContext()),
workerExecutor);
}

@Override
public InsertsStreamSubscriber createInsertsSubscriber(final String target,
public CompletableFuture<InsertsStreamSubscriber> createInsertsSubscriber(final String target,
final JsonObject properties,
final Subscriber<InsertResult> acksSubscriber, final Context context,
final WorkerExecutor workerExecutor,
final ApiSecurityContext apiSecurityContext) {
return new InsertsStreamEndpoint(ksqlEngine, ksqlConfig, reservedInternalTopics)
.createInsertsSubscriber(target, properties, acksSubscriber, context, workerExecutor,
createServiceContext(apiSecurityContext));
return executeOnWorker(
() -> new InsertsStreamEndpoint(ksqlEngine, ksqlConfig, reservedInternalTopics)
.createInsertsSubscriber(target, properties, acksSubscriber, context, workerExecutor,
ksqlSecurityContextProvider.provide(apiSecurityContext).getServiceContext()),
workerExecutor);
}

private ServiceContext createServiceContext(final ApiSecurityContext apiSecurityContext) {
return ksqlSecurityContextProvider.provide(apiSecurityContext).getServiceContext();
private <R> CompletableFuture<R> executeOnWorker(final Supplier<R> supplier,
final WorkerExecutor workerExecutor) {
final VertxCompletableFuture<R> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(promise -> promise.complete(supplier.get()), false, vcf);
return vcf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ private ErrorCodes() {
public static final int ERROR_CODE_CANNOT_COERCE_FIELD = 7;
public static final int ERROR_MAX_PUSH_QUERIES_EXCEEDED = 8;


public static final int ERROR_CODE_INTERNAL_ERROR = 100;


Expand Down
Loading

0 comments on commit 395626c

Please sign in to comment.