Skip to content

Commit

Permalink
Receiver reject requests for wrong audience (knative-extensions#3675)
Browse files Browse the repository at this point in the history
* Receiver: reject request for wrong audience

* Switch to AuthenticationHandler

* Fix "Request has already been read" issue

* Change TokenVerifier to an interface

* Initialize TokenVerifier in main

* Add test for AuthenticationHandler

* Only initialize OIDC discovery config in main and create a TokenVerifier per verticle instance.

* Rerun hack/update-codegen.sh

* Move TokenVerifier setup into setup() to prevent null pointer exception when vertx is null

* Update KafkaChannel OIDC e2e tests, to run OIDC conformance tests so the receiver is tested too.

* Run OIDC e2e tests as part of the reconciler suite

* Fix KafkaChannelOIDC e2e test

* Fix lint issue

* Address review comments
  • Loading branch information
creydr committed Apr 2, 2024
1 parent 1ec0180 commit 097bff2
Show file tree
Hide file tree
Showing 20 changed files with 404 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,11 @@
package dev.knative.eventing.kafka.broker.core.oidc;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.jwt.consumer.JwtContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TokenVerifier {
public interface TokenVerifier {
Future<JwtClaims> verify(String token, String expectedAudience);

private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class);

private final Vertx vertx;

private final OIDCDiscoveryConfig oidcDiscoveryConfig;

public TokenVerifier(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) {
this.vertx = vertx;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public Future<JwtClaims> verify(String token, String expectedAudience) {
return this.vertx.<JwtClaims>executeBlocking(promise -> {
// execute blocking, as jose .process() is blocking

JwtConsumer jwtConsumer = new JwtConsumerBuilder()
.setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver())
.setExpectedAudience(expectedAudience)
.setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer())
.build();

try {
JwtContext jwtContext = jwtConsumer.process(token);

promise.complete(jwtContext.getJwtClaims());
} catch (InvalidJwtException e) {
promise.fail(e);
}
});
}

public Future<JwtClaims> verify(HttpServerRequest request, String expectedAudience) {
String authHeader = request.getHeader("Authorization");
if (authHeader == null || authHeader.isEmpty()) {
return Future.failedFuture("Request didn't contain Authorization header"); // change to exception
}

if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) {
return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception
}

String token = authHeader.substring("Bearer ".length());

return verify(token, expectedAudience);
}
Future<JwtClaims> verify(HttpServerRequest request, String expectedAudience);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.jwt.consumer.JwtContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TokenVerifierImpl implements TokenVerifier {

private static final Logger logger = LoggerFactory.getLogger(TokenVerifierImpl.class);

private final Vertx vertx;

private final OIDCDiscoveryConfig oidcDiscoveryConfig;

public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) {
this.vertx = vertx;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public Future<JwtClaims> verify(String token, String expectedAudience) {
return this.vertx.<JwtClaims>executeBlocking(promise -> {
// execute blocking, as jose .process() is blocking

JwtConsumer jwtConsumer = new JwtConsumerBuilder()
.setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver())
.setExpectedAudience(expectedAudience)
.setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer())
.build();

try {
JwtContext jwtContext = jwtConsumer.process(token);

promise.complete(jwtContext.getJwtClaims());
} catch (InvalidJwtException e) {
promise.fail(e);
}
});
}

public Future<JwtClaims> verify(final HttpServerRequest request, String expectedAudience) {
String authHeader = request.getHeader("Authorization");
if (authHeader == null || authHeader.isEmpty()) {
return Future.failedFuture("Request didn't contain Authorization header");
}

if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) {
return Future.failedFuture("Authorization header didn't contain Bearer token");
}

String token = authHeader.substring("Bearer ".length());

request.pause();
return verify(token, expectedAudience).onSuccess(v -> request.resume());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package dev.knative.eventing.kafka.broker.receiverloom;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class Main {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new LoomProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package dev.knative.eventing.kafka.broker.receiververtx;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class Main {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new VertxProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ default Future<RecordMetadata> send(ProducerRecord<String, CloudEvent> record) {
* @return the resource associated with this producer.
*/
DataPlaneContract.Reference getReference();

/**
* @return the OIDC audience for the ingress.
*/
String getAudience();
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private static class IngressProducerImpl implements IngressProducer {
private final String host;
private final Properties producerProperties;
private final DataPlaneContract.Reference reference;
private final String audience;

IngressProducerImpl(
final ReactiveKafkaProducer<String, CloudEvent> producer,
Expand All @@ -276,6 +277,7 @@ private static class IngressProducerImpl implements IngressProducer {
this.producer = producer;
this.topic = resource.getTopics(0);
this.reference = resource.getReference();
this.audience = resource.getIngress().getAudience();
this.path = path;
this.host = host;
this.producerProperties = producerProperties;
Expand All @@ -291,6 +293,11 @@ public String getTopic() {
return topic;
}

@Override
public String getAudience() {
return audience;
}

@Override
public DataPlaneContract.Reference getReference() {
return reference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl;
import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener;
import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler;
import dev.knative.eventing.kafka.broker.receiver.IngressProducer;
import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler;
import dev.knative.eventing.kafka.broker.receiver.RequestContext;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler;
import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv;
Expand Down Expand Up @@ -83,12 +87,13 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler<HttpSe
private final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory;
private final IngressRequestHandler ingressRequestHandler;
private final ReceiverEnv env;
private final OIDCDiscoveryConfig oidcDiscoveryConfig;

private AuthenticationHandler authenticationHandler;
private HttpServer httpServer;
private HttpServer httpsServer;
private MessageConsumer<Object> messageConsumer;
private IngressProducerReconcilableStore ingressProducerStore;

private FileWatcher secretWatcher;

public ReceiverVerticle(
Expand All @@ -97,7 +102,8 @@ public ReceiverVerticle(
final HttpServerOptions httpsServerOptions,
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
final IngressRequestHandler ingressRequestHandler,
final String secretVolumePath) {
final String secretVolumePath,
final OIDCDiscoveryConfig oidcDiscoveryConfig) {

Objects.requireNonNull(env);
Objects.requireNonNull(httpServerOptions);
Expand All @@ -114,6 +120,7 @@ public ReceiverVerticle(
this.secretVolume = new File(secretVolumePath);
this.tlsKeyFile = new File(secretVolumePath + "/tls.key");
this.tlsCrtFile = new File(secretVolumePath + "/tls.crt");
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public HttpServerOptions getHttpsServerOptions() {
Expand All @@ -127,6 +134,9 @@ public void start(final Promise<Void> startPromise) {
.watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler))
.buildAndListen(vertx);

TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig);
this.authenticationHandler = new AuthenticationHandler(tokenVerifier);

this.httpServer = vertx.createHttpServer(this.httpServerOptions);

// check whether the secret volume is mounted
Expand Down Expand Up @@ -200,10 +210,7 @@ public void stop(Promise<Void> stopPromise) throws Exception {
}

@Override
public void handle(HttpServerRequest request) {

final var requestContext = new RequestContext(request);

public void handle(final HttpServerRequest request) {
// Look up for the ingress producer
IngressProducer producer = this.ingressProducerStore.resolve(request.host(), request.path());
if (producer == null) {
Expand All @@ -224,8 +231,11 @@ public void handle(HttpServerRequest request) {
return;
}

// Invoke the ingress request handler
this.ingressRequestHandler.handle(requestContext, producer);
this.authenticationHandler.handle(request, producer, req -> {
// Invoke the ingress request handler
final var requestContext = new RequestContext(req);
this.ingressRequestHandler.handle(requestContext, producer);
});
}

public void updateServerConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver.impl.handler;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier;
import dev.knative.eventing.kafka.broker.receiver.IngressProducer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handler checking that the provided request contained a valid JWT.
*/
public class AuthenticationHandler {

private static final Logger logger = LoggerFactory.getLogger(AuthenticationHandler.class);
private final TokenVerifier tokenVerifier;

public AuthenticationHandler(final TokenVerifier tokenVerifier) {
this.tokenVerifier = tokenVerifier;
}

public void handle(
final HttpServerRequest request, final IngressProducer ingressInfo, final Handler<HttpServerRequest> next) {
if (ingressInfo.getAudience().isEmpty()) {
logger.debug("No audience for ingress set. Continue without authentication check...");
next.handle(request);
return;
}

tokenVerifier
.verify(request, ingressInfo.getAudience())
.onFailure(e -> {
logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage()));
request.response()
.setStatusCode(HttpResponseStatus.UNAUTHORIZED.code())
.end();
})
.onSuccess(jwtClaims -> {
logger.debug("Request contained valid JWT. Continuing...");
next.handle(request);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
Expand All @@ -39,6 +40,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -60,7 +62,7 @@ public class Main {
* @param args command line arguments.
*/
public static void start(final String[] args, final ReactiveProducerFactory kafkaProducerFactory)
throws IOException {
throws IOException, ExecutionException, InterruptedException {
ReceiverEnv env = new ReceiverEnv(System::getenv);

OpenTelemetrySdk openTelemetry =
Expand Down Expand Up @@ -97,14 +99,21 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
httpsServerOptions.setPort(env.getIngressTLSPort());
httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE);

// Setup OIDC discovery config
OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx)
.toCompletionStage()
.toCompletableFuture()
.get();

// Configure the verticle to deploy and the deployment options
final Supplier<Verticle> receiverVerticleFactory = new ReceiverVerticleFactory(
env,
producerConfigs,
Metrics.getRegistry(),
httpServerOptions,
httpsServerOptions,
kafkaProducerFactory);
kafkaProducerFactory,
oidcDiscoveryConfig);
DeploymentOptions deploymentOptions =
new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors());

Expand Down
Loading

0 comments on commit 097bff2

Please sign in to comment.