Skip to content

Commit

Permalink
feat: Adds SSL mutual auth support to intra-cluster requests (#5482)
Browse files Browse the repository at this point in the history
* feat: Adds SSL mutual auth support to intra-cluster requests
  • Loading branch information
AlanConfluent authored Jun 16, 2020
1 parent 44fb447 commit 82b137f
Show file tree
Hide file tree
Showing 20 changed files with 1,478 additions and 87 deletions.
312 changes: 287 additions & 25 deletions docs/operate-and-deploy/installation/server-config/security.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.api.auth;

import static io.confluent.ksql.api.auth.SystemAuthenticationHandler.isAuthenticatedAsSystemUser;
import static io.confluent.ksql.api.server.ServerUtils.convertCommaSeparatedWilcardsToRegex;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
Expand Down Expand Up @@ -69,6 +70,9 @@ public void handle(final RoutingContext routingContext) {
if (unauthedPathsPattern.matcher(routingContext.normalisedPath()).matches()) {
routingContext.next();
return;
} else if (isAuthenticatedAsSystemUser(routingContext)) {
routingContext.next();
return;
}
final CompletableFuture<Principal> cf = securityHandlerPlugin
.handleAuth(routingContext, server.getWorkerExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void handle(final RoutingContext routingContext) {
return;
}

if (SystemAuthenticationHandler.isAuthenticatedAsSystemUser(routingContext)) {
routingContext.next();
return;
}

workerExecutor.<Void>executeBlocking(
promise -> authorize(promise, routingContext),
ar -> handleAuthorizeResult(ar, routingContext));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.auth;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.AuthProvider;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
import java.security.Principal;
import java.util.Objects;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;

public class SystemAuthenticationHandler implements Handler<RoutingContext> {

public SystemAuthenticationHandler() {}

@Override
public void handle(final RoutingContext routingContext) {
final HttpConnection httpConnection = routingContext.request().connection();
if (!httpConnection.isSsl()) {
throw new IllegalStateException("Should only have ssl connections");
}
final Principal peerPrincipal = getPeerPrincipal(httpConnection.sslSession());
routingContext.setUser(new SystemUser(peerPrincipal));
routingContext.next();
}

private static Principal getPeerPrincipal(final SSLSession sslSession) {
try {
return sslSession.getPeerPrincipal();
} catch (SSLPeerUnverifiedException e) {
throw new IllegalStateException("Peer should always be verified", e);
}
}

public static boolean isAuthenticatedAsSystemUser(final RoutingContext routingContext) {
final User user = routingContext.user();
return user instanceof SystemUser;
}

private static class SystemUser implements ApiUser {

private final Principal principal;

SystemUser(final Principal principal) {
this.principal = Objects.requireNonNull(principal);
}

@SuppressWarnings("deprecation")
@Override
public User isAuthorized(final String s, final Handler<AsyncResult<Boolean>> handler) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public User clearCache() {
throw new UnsupportedOperationException();
}

@Override
public JsonObject principal() {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public void setAuthProvider(final AuthProvider authProvider) {
throw new UnsupportedOperationException();
}

@Override
public Principal getPrincipal() {
return principal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,35 @@
import io.confluent.ksql.api.auth.AuthenticationPluginHandler;
import io.confluent.ksql.api.auth.JaasAuthProvider;
import io.confluent.ksql.api.auth.KsqlAuthorizationProviderHandler;
import io.confluent.ksql.api.auth.SystemAuthenticationHandler;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.vertx.core.Handler;
import io.vertx.core.http.ClientAuth;
import io.vertx.ext.auth.AuthProvider;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.AuthHandler;
import io.vertx.ext.web.handler.BasicAuthHandler;
import java.net.URI;
import java.util.Optional;

public final class AuthHandlers {

private AuthHandlers() {
}

static void setupAuthHandlers(final Server server, final Router router) {
static void setupAuthHandlers(final Server server, final Router router,
final boolean isInternalListener) {
final Optional<AuthHandler> jaasAuthHandler = getJaasAuthHandler(server);
final KsqlSecurityExtension securityExtension = server.getSecurityExtension();
final Optional<AuthenticationPlugin> authenticationPlugin = server.getAuthenticationPlugin();
final Optional<Handler<RoutingContext>> pluginHandler =
authenticationPlugin.map(plugin -> new AuthenticationPluginHandler(server, plugin));
final Optional<SystemAuthenticationHandler> systemAuthenticationHandler
= getSystemAuthenticationHandler(server, isInternalListener);

systemAuthenticationHandler.ifPresent(handler -> router.route().handler(handler));

if (jaasAuthHandler.isPresent() || authenticationPlugin.isPresent()) {
router.route().handler(AuthHandlers::pauseHandler);
Expand All @@ -62,6 +70,11 @@ static void setupAuthHandlers(final Server server, final Router router) {
private static void wrappedAuthHandler(final RoutingContext routingContext,
final Optional<AuthHandler> jaasAuthHandler,
final Optional<Handler<RoutingContext>> pluginHandler) {
if (SystemAuthenticationHandler.isAuthenticatedAsSystemUser(routingContext)) {
routingContext.next();
return;
}
// Fall through to authing with Jaas
if (jaasAuthHandler.isPresent()) {
// If we have a Jaas handler configured and we have Basic credentials then we should auth
// with that
Expand Down Expand Up @@ -111,6 +124,33 @@ private static AuthHandler basicAuthHandler(final Server server) {
return basicAuthHandler;
}

/**
* Gets the SystemAuthenticationHandler, if the requirements are met for it to be installed.
* The requirements for installation are that SSL mutual auth is in effect for the connection
* (meaning that the request is verified to be coming from a known set of servers in the cluster),
* and that it came on the internal listener interface, meaning that it's being done with the
* authorization of the system rather than directly on behalf of the user. Mutual auth is only
* enforced when SSL is used.
* @param server The server to potentially install the handler
* @param isInternalListener If this handler is being considered for the internal listener
* @return The SystemAuthenticationHandler if the requirements are met
*/
private static Optional<SystemAuthenticationHandler> getSystemAuthenticationHandler(
final Server server, final boolean isInternalListener) {
final String internalListener = server.getConfig().getString(
KsqlRestConfig.INTERNAL_LISTENER_CONFIG);
if (internalListener == null) {
return Optional.empty();
}
final String scheme = URI.create(internalListener).getScheme();
if (server.getConfig().getClientAuthInternal() == ClientAuth.REQUIRED
&& "https".equalsIgnoreCase(scheme) && isInternalListener) {
return Optional.of(new SystemAuthenticationHandler());
}
// Fall back on other authentication methods.
return Optional.empty();
}

private static void pauseHandler(final RoutingContext routingContext) {
// prevent auth handler from reading request body
routingContext.request().pause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

import static io.confluent.ksql.rest.Errors.ERROR_CODE_MAX_PUSH_QUERIES_EXCEEDED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.KeystoreUtil;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.impl.ConcurrentHashSet;
Expand Down Expand Up @@ -117,7 +120,7 @@ public synchronized void start() {
final VertxCompletableFuture<String> vcf = new VertxCompletableFuture<>();
final ServerVerticle serverVerticle = new ServerVerticle(endpoints,
createHttpServerOptions(config, listener.getHost(), listener.getPort(),
listener.getScheme().equalsIgnoreCase("https")),
listener.getScheme().equalsIgnoreCase("https"), isInternalListener.orElse(false)),
this, isInternalListener);
vertx.deployVerticle(serverVerticle, vcf);
final int index = i;
Expand Down Expand Up @@ -271,7 +274,8 @@ private void configureTlsCertReload(final KsqlRestConfig config) {
}

private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ksqlRestConfig,
final String host, final int port, final boolean tls) {
final String host, final int port, final boolean tls,
final boolean isInternalListener) {

final HttpServerOptions options = new HttpServerOptions()
.setHost(host)
Expand All @@ -283,44 +287,61 @@ private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ks
.setPerFrameWebSocketCompressionSupported(true);

if (tls) {
options.setUseAlpn(true).setSsl(true);

final String keyStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
final Password keyStorePassword = ksqlRestConfig
.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
if (keyStorePath != null && !keyStorePath.isEmpty()) {
final String keyStoreType =
ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_TYPE_CONFIG);
if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
options.setKeyStoreOptions(
new JksOptions().setPath(keyStorePath).setPassword(keyStorePassword.value()));
} else if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
options.setPfxKeyCertOptions(
new PfxOptions().setPath(keyStorePath).setPassword(keyStorePassword.value()));
}
}
final String ksConfigName = isInternalListener
? KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG
: KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_EXTERNAL_CONFIG;
final ClientAuth clientAuth = isInternalListener
? ksqlRestConfig.getClientAuthInternal()
: ksqlRestConfig.getClientAuth();

final String alias = ksqlRestConfig.getString(ksConfigName);
setTlsOptions(ksqlRestConfig, options, alias, clientAuth);
}
return options;
}

final String trustStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
final Password trustStorePassword = ksqlRestConfig
.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
if (trustStorePath != null && !trustStorePath.isEmpty()) {
final String trustStoreType =
ksqlRestConfig.getString(KsqlRestConfig.SSL_TRUSTSTORE_TYPE_CONFIG);
if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
options.setTrustStoreOptions(
new JksOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
} else if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
options.setPfxTrustOptions(
new PfxOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
}
private static void setTlsOptions(
final KsqlRestConfig ksqlRestConfig,
final HttpServerOptions options,
final String keyStoreAlias,
final ClientAuth clientAuth
) {
options.setUseAlpn(true).setSsl(true);

final String keyStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
final Password keyStorePassword = ksqlRestConfig
.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
if (keyStorePath != null && !keyStorePath.isEmpty()) {
final String keyStoreType =
ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_TYPE_CONFIG);
if (keyStoreAlias != null && !keyStoreAlias.isEmpty()) {
options.setKeyStoreOptions(new JksOptions().setValue(KeystoreUtil.getKeyStore(
keyStoreType,
keyStorePath,
Optional.ofNullable(Strings.emptyToNull(keyStorePassword.value())),
Optional.ofNullable(Strings.emptyToNull(keyStorePassword.value())),
keyStoreAlias))
.setPassword(keyStorePassword.value()));
} else if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
options.setKeyStoreOptions(
new JksOptions().setPath(keyStorePath).setPassword(keyStorePassword.value()));
} else if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
options.setPfxKeyCertOptions(
new PfxOptions().setPath(keyStorePath).setPassword(keyStorePassword.value()));
}
}

options.setClientAuth(ksqlRestConfig.getClientAuth());
final String trustStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
final Password trustStorePassword = ksqlRestConfig
.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
if (trustStorePath != null && !trustStorePath.isEmpty()) {
options.setTrustStoreOptions(
new JksOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
}

return options;
options.setClientAuth(clientAuth);
}

private static List<URI> parseListeners(final KsqlRestConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private Router setupRouter() {
isInternalListener.ifPresent(isInternal ->
router.route().handler(new InternalEndpointHandler(isInternal)));

AuthHandlers.setupAuthHandlers(server, router);
AuthHandlers.setupAuthHandlers(server, router, isInternalListener.orElse(false));

router.route().handler(new ServerStateHandler(server.getServerState()));

Expand Down
Loading

0 comments on commit 82b137f

Please sign in to comment.