From bfcce3f97f5f4e14c0bd07883f2946d5b8fc845d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Mon, 5 Feb 2024 08:59:13 +0100 Subject: [PATCH 01/14] Receiver: reject request for wrong audience --- .../broker/receiver/IngressProducer.java | 5 + .../receiver/IngressRequestHandler.java | 3 +- .../IngressProducerReconcilableStore.java | 7 + .../receiver/impl/ReceiverVerticle.java | 24 +- .../handler/IngressRequestHandlerImpl.java | 234 +++++++++++++----- .../IngressRequestHandlerImplTest.java | 80 +++--- 6 files changed, 259 insertions(+), 94 deletions(-) diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java index 116067adc6..149191540d 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java @@ -50,4 +50,9 @@ default Future send(ProducerRecord record) { * @return the resource associated with this producer. */ DataPlaneContract.Reference getReference(); + + /** + * @return the OIDC audience for the ingress. + */ + String getAudience(); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java index 617fc3da0f..55684f33e8 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java @@ -15,6 +15,7 @@ */ package dev.knative.eventing.kafka.broker.receiver; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; /** @@ -22,5 +23,5 @@ */ public interface IngressRequestHandler extends IngressReconcilerListener { - void handle(RequestContext request, IngressProducer producer); + void handle(RequestContext request, IngressProducer producer, TokenVerifier tokenVerifier); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java index 3926d2e3c2..1f3dafa51d 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java @@ -266,6 +266,7 @@ private static class IngressProducerImpl implements IngressProducer { private final String host; private final Properties producerProperties; private final DataPlaneContract.Reference reference; + private final String audience; IngressProducerImpl( final ReactiveKafkaProducer producer, @@ -276,6 +277,7 @@ private static class IngressProducerImpl implements IngressProducer { this.producer = producer; this.topic = resource.getTopics(0); this.reference = resource.getReference(); + this.audience = resource.getIngress().getAudience(); this.path = path; this.host = host; this.producerProperties = producerProperties; @@ -291,6 +293,11 @@ public String getTopic() { return topic; } + @Override + public String getAudience() { + return audience; + } + @Override public DataPlaneContract.Reference getReference() { return reference; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 115383e954..7ececacf3e 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -22,6 +22,8 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; @@ -88,7 +90,7 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; - + private TokenVerifier tokenVerifier; private FileWatcher secretWatcher; public ReceiverVerticle( @@ -143,9 +145,19 @@ public void start(final Promise startPromise) { } } + Promise oidcPromise = Promise.promise(); + OIDCDiscoveryConfig.build(vertx) + .onSuccess(oidcDiscoveryConfig -> { + this.tokenVerifier = new TokenVerifier(vertx, oidcDiscoveryConfig); + logger.debug("OIDC TokenVerifier configured"); + oidcPromise.complete(); + }) + .onFailure(oidcPromise::fail); + final var handler = new ProbeHandler( env.getLivenessProbePath(), env.getReadinessProbePath(), new MethodNotAllowedHandler(this)); + Promise httpServerPromise = Promise.promise(); if (this.httpsServer != null) { CompositeFuture.all( this.httpServer @@ -157,16 +169,20 @@ public void start(final Promise startPromise) { .exceptionHandler(startPromise::tryFail) .listen(this.httpsServerOptions.getPort(), this.httpsServerOptions.getHost())) .mapEmpty() - .onComplete(startPromise); + .onComplete(httpServerPromise); } else { this.httpServer .requestHandler(handler) .exceptionHandler(startPromise::tryFail) .listen(this.httpServerOptions.getPort(), this.httpServerOptions.getHost()) .mapEmpty() - .onComplete(startPromise); + .onComplete(httpServerPromise); } + Future.all(oidcPromise.future(), httpServerPromise.future()) + .mapEmpty() + .onComplete(startPromise); + setupSecretWatcher(); } @@ -225,7 +241,7 @@ public void handle(HttpServerRequest request) { } // Invoke the ingress request handler - this.ingressRequestHandler.handle(requestContext, producer); + this.ingressRequestHandler.handle(requestContext, producer, this.tokenVerifier); } public void updateServerConfig() { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java index 0ef11bf421..6ea6d4c76f 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java @@ -22,6 +22,7 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.tracing.TracingSpan; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; @@ -32,11 +33,13 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.netty.handler.codec.http.HttpResponseStatus; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.vertx.core.Future; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.jose4j.jwt.JwtClaims; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +80,8 @@ public IngressRequestHandlerImpl( } @Override - public void handle(final RequestContext requestContext, final IngressProducer producer) { + public void handle( + final RequestContext requestContext, final IngressProducer producer, final TokenVerifier tokenVerifier) { final Tags resourceTags = Metrics.resourceRefTags(producer.getReference()); @@ -101,66 +105,182 @@ public void handle(final RequestContext requestContext, final IngressProducer pr cause); }) .compose(record -> { - // Conversion to record succeeded, let's push it to Kafka - if (logger.isDebugEnabled()) { - final var span = Span.fromContextOrNull(Context.current()); - if (span != null) { - logger.debug( - "Received event {} {}", - keyValue("event", record.value()), - keyValue( - TracingConfig.TRACE_ID_KEY, - span.getSpanContext().getTraceId())); - } else { - logger.debug("Received event {}", keyValue("event", record.value())); - } + Future f; + if (!producer.getAudience().isEmpty()) { + f = tokenVerifier + .verify(requestContext.getRequest(), producer.getAudience()) + .onFailure(e -> { + logger.debug("Could not verify JWT: " + e.getMessage()); + requestContext + .getRequest() + .response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + }); + } else { + f = Future.succeededFuture(); } - // Decorate the span with event specific attributed - TracingSpan.decorateCurrentWithEvent(record.value()); - - final var eventTypeTag = - Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); - - return publishRecord(producer, record) - .onSuccess(m -> { - requestContext - .getRequest() - .response() - .setStatusCode(RECORD_PRODUCED) - .end(); - - final var tags = RECORD_PRODUCED_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - }) - .onFailure(cause -> { - requestContext - .getRequest() - .response() - .setStatusCode(FAILED_TO_PRODUCE) - .end(); - - final var tags = FAILED_TO_PRODUCE_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - - logger.warn( - "Failed to produce record {}", + return f.compose(jwtClaims -> { + logger.debug("Request contained a valid JWT. Continuing..."); + + // Conversion to record succeeded, let's push it to Kafka + if (logger.isDebugEnabled()) { + final var span = Span.fromContextOrNull(Context.current()); + if (span != null) { + logger.debug( + "Received event {} {}", + keyValue("event", record.value()), keyValue( - "path", - requestContext.getRequest().path()), - cause); - }); + TracingConfig.TRACE_ID_KEY, + span.getSpanContext().getTraceId())); + } else { + logger.debug("Received event {}", keyValue("event", record.value())); + } + } + + // Decorate the span with event specific attributed + TracingSpan.decorateCurrentWithEvent(record.value()); + + final var eventTypeTag = + Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); + + return publishRecord(producer, record) + .onSuccess(m -> { + requestContext + .getRequest() + .response() + .setStatusCode(RECORD_PRODUCED) + .end(); + + final var tags = RECORD_PRODUCED_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags) + .register(meterRegistry) + .increment(); + }) + .onFailure(cause -> { + requestContext + .getRequest() + .response() + .setStatusCode(FAILED_TO_PRODUCE) + .end(); + + final var tags = FAILED_TO_PRODUCE_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags) + .register(meterRegistry) + .increment(); + + logger.warn( + "Failed to produce record {}", + keyValue( + "path", + requestContext.getRequest().path()), + cause); + }); + }); }); + + /* //todo: only verify audience, if audience set (--> oidc enabled) + tokenVerifier.verify(requestContext.getRequest(), producer.getAudience()) + .onFailure(e -> { + logger.debug("Could not verify JWT: " + e.getMessage()); + requestContext + .getRequest() + .response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + }).onSuccess(jwtClaims -> { + logger.debug("Valid JWT provided. Continuing..."); + + requestToRecordMapper.requestToRecord(requestContext.getRequest(), producer.getTopic()) + .onFailure(cause -> { + // Conversion to record failed + requestContext + .getRequest() + .response() + .setStatusCode(MAPPER_FAILED) + .end(); + + final var tags = MAPPER_FAILED_COMMON_TAGS.and(resourceTags); + Metrics.eventDispatchLatency(tags).register(meterRegistry).record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + + logger.warn( + "Failed to convert request to record {}", + keyValue("path", requestContext.getRequest().path()), + cause); + }).compose(record -> { + // Conversion to record succeeded, let's push it to Kafka + if (logger.isDebugEnabled()) { + final var span = Span.fromContextOrNull(Context.current()); + if (span != null) { + logger.debug( + "Received event {} {}", + keyValue("event", record.value()), + keyValue( + TracingConfig.TRACE_ID_KEY, + span.getSpanContext().getTraceId())); + } else { + logger.debug("Received event {}", keyValue("event", record.value())); + } + } + + // Decorate the span with event specific attributed + TracingSpan.decorateCurrentWithEvent(record.value()); + + final var eventTypeTag = + Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); + + return publishRecord(producer, record) + .onSuccess(m -> { + requestContext + .getRequest() + .response() + .setStatusCode(RECORD_PRODUCED) + .end(); + + final var tags = RECORD_PRODUCED_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + }) + .onFailure(cause -> { + requestContext + .getRequest() + .response() + .setStatusCode(FAILED_TO_PRODUCE) + .end(); + + final var tags = FAILED_TO_PRODUCE_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + + logger.warn( + "Failed to produce record {}", + keyValue( + "path", + requestContext.getRequest().path()), + cause); + }); + }); + });*/ } private static Future publishRecord( diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index d95cc20d4c..45a7512845 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -83,22 +83,30 @@ final var record = new ProducerRecord<>("topic", 10, "key", CoreObjects.event()) final var handler = new IngressRequestHandlerImpl(mapper, Metrics.getRegistry()); - handler.handle(new RequestContext(request), new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return producer; - } - - @Override - public String getTopic() { - return "1-12345"; - } - - @Override - public DataPlaneContract.Reference getReference() { - return DataPlaneContract.Reference.newBuilder().build(); - } - }); + handler.handle( + new RequestContext(request), + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return producer; + } + + @Override + public String getTopic() { + return "1-12345"; + } + + @Override + public DataPlaneContract.Reference getReference() { + return DataPlaneContract.Reference.newBuilder().build(); + } + + @Override + public String getAudience() { + return ""; + } + }, + null); verifySetStatusCodeAndTerminateResponse(statusCode, response); } @@ -114,22 +122,30 @@ public void shouldReturnBadRequestIfNoRecordCanBeCreated() { final var handler = new IngressRequestHandlerImpl(mapper, Metrics.getRegistry()); - handler.handle(new RequestContext(request), new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return producer; - } - - @Override - public String getTopic() { - return "1-12345"; - } - - @Override - public DataPlaneContract.Reference getReference() { - return DataPlaneContract.Reference.newBuilder().build(); - } - }); + handler.handle( + new RequestContext(request), + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return producer; + } + + @Override + public String getTopic() { + return "1-12345"; + } + + @Override + public DataPlaneContract.Reference getReference() { + return DataPlaneContract.Reference.newBuilder().build(); + } + + @Override + public String getAudience() { + return ""; + } + }, + null); verifySetStatusCodeAndTerminateResponse(IngressRequestHandlerImpl.MAPPER_FAILED, response); } From 84acd5965bd1a5a637e652d01083ee041c1517e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Mon, 5 Feb 2024 11:15:50 +0100 Subject: [PATCH 02/14] Switch to AuthenticationHandler --- .../kafka/broker/core/oidc/TokenVerifier.java | 2 +- .../receiver/IngressRequestHandler.java | 3 +- .../receiver/impl/ReceiverVerticle.java | 20 +- .../impl/handler/AuthenticationHandler.java | 59 +++++ .../handler/IngressRequestHandlerImpl.java | 234 +++++------------- .../IngressRequestHandlerImplTest.java | 6 +- 6 files changed, 131 insertions(+), 193 deletions(-) create mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java index e12c764de3..bd4461e19b 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java @@ -59,7 +59,7 @@ public Future verify(String token, String expectedAudience) { }); } - public Future verify(HttpServerRequest request, String expectedAudience) { + public Future verify(final HttpServerRequest request, String expectedAudience) { String authHeader = request.getHeader("Authorization"); if (authHeader == null || authHeader.isEmpty()) { return Future.failedFuture("Request didn't contain Authorization header"); // change to exception diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java index 55684f33e8..617fc3da0f 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java @@ -15,7 +15,6 @@ */ package dev.knative.eventing.kafka.broker.receiver; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; /** @@ -23,5 +22,5 @@ */ public interface IngressRequestHandler extends IngressReconcilerListener { - void handle(RequestContext request, IngressProducer producer, TokenVerifier tokenVerifier); + void handle(RequestContext request, IngressProducer producer); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 7ececacf3e..c810e4cf1c 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -29,6 +29,7 @@ import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; @@ -90,7 +91,7 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; - private TokenVerifier tokenVerifier; + private AuthenticationHandler authenticationHandler; private FileWatcher secretWatcher; public ReceiverVerticle( @@ -148,8 +149,9 @@ public void start(final Promise startPromise) { Promise oidcPromise = Promise.promise(); OIDCDiscoveryConfig.build(vertx) .onSuccess(oidcDiscoveryConfig -> { - this.tokenVerifier = new TokenVerifier(vertx, oidcDiscoveryConfig); - logger.debug("OIDC TokenVerifier configured"); + TokenVerifier tokenVerifier = new TokenVerifier(vertx, oidcDiscoveryConfig); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + logger.debug("Authenticationhandler configured"); oidcPromise.complete(); }) .onFailure(oidcPromise::fail); @@ -216,10 +218,7 @@ public void stop(Promise stopPromise) throws Exception { } @Override - public void handle(HttpServerRequest request) { - - final var requestContext = new RequestContext(request); - + public void handle(final HttpServerRequest request) { // Look up for the ingress producer IngressProducer producer = this.ingressProducerStore.resolve(request.host(), request.path()); if (producer == null) { @@ -240,8 +239,11 @@ public void handle(HttpServerRequest request) { return; } - // Invoke the ingress request handler - this.ingressRequestHandler.handle(requestContext, producer, this.tokenVerifier); + this.authenticationHandler.handle(request, producer, req -> { + // Invoke the ingress request handler + final var requestContext = new RequestContext(req); + this.ingressRequestHandler.handle(requestContext, producer); + }); } public void updateServerConfig() { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java new file mode 100644 index 0000000000..9a34cee569 --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; + +/** + * Handler checking that the provided request contained a valid JWT. + */ +public class AuthenticationHandler { + + private static final Logger logger = LoggerFactory.getLogger(AuthenticationHandler.class); + private final TokenVerifier tokenVerifier; + + public AuthenticationHandler(final TokenVerifier tokenVerifier) { + this.tokenVerifier = tokenVerifier; + } + + public void handle(final HttpServerRequest request, final IngressProducer ingressInfo, final Handler next) { + if (ingressInfo.getAudience().isEmpty()) { + logger.debug("No audience for ingress set. Continue without authentication check..."); + next.handle(request); + return; + } + + tokenVerifier.verify(request, ingressInfo.getAudience()) + .onFailure(e -> { + logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); + request + .response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + }).onSuccess(jwtClaims -> { + logger.debug("Request contained valid JWT. Continuing..."); + next.handle(request); + }); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java index 6ea6d4c76f..0ef11bf421 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java @@ -22,7 +22,6 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.tracing.TracingSpan; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; @@ -33,13 +32,11 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; -import io.netty.handler.codec.http.HttpResponseStatus; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.vertx.core.Future; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.jose4j.jwt.JwtClaims; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,8 +77,7 @@ public IngressRequestHandlerImpl( } @Override - public void handle( - final RequestContext requestContext, final IngressProducer producer, final TokenVerifier tokenVerifier) { + public void handle(final RequestContext requestContext, final IngressProducer producer) { final Tags resourceTags = Metrics.resourceRefTags(producer.getReference()); @@ -105,182 +101,66 @@ public void handle( cause); }) .compose(record -> { - Future f; - if (!producer.getAudience().isEmpty()) { - f = tokenVerifier - .verify(requestContext.getRequest(), producer.getAudience()) - .onFailure(e -> { - logger.debug("Could not verify JWT: " + e.getMessage()); - requestContext - .getRequest() - .response() - .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) - .end(); - }); - } else { - f = Future.succeededFuture(); + // Conversion to record succeeded, let's push it to Kafka + if (logger.isDebugEnabled()) { + final var span = Span.fromContextOrNull(Context.current()); + if (span != null) { + logger.debug( + "Received event {} {}", + keyValue("event", record.value()), + keyValue( + TracingConfig.TRACE_ID_KEY, + span.getSpanContext().getTraceId())); + } else { + logger.debug("Received event {}", keyValue("event", record.value())); + } } - return f.compose(jwtClaims -> { - logger.debug("Request contained a valid JWT. Continuing..."); - - // Conversion to record succeeded, let's push it to Kafka - if (logger.isDebugEnabled()) { - final var span = Span.fromContextOrNull(Context.current()); - if (span != null) { - logger.debug( - "Received event {} {}", - keyValue("event", record.value()), + // Decorate the span with event specific attributed + TracingSpan.decorateCurrentWithEvent(record.value()); + + final var eventTypeTag = + Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); + + return publishRecord(producer, record) + .onSuccess(m -> { + requestContext + .getRequest() + .response() + .setStatusCode(RECORD_PRODUCED) + .end(); + + final var tags = RECORD_PRODUCED_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + }) + .onFailure(cause -> { + requestContext + .getRequest() + .response() + .setStatusCode(FAILED_TO_PRODUCE) + .end(); + + final var tags = FAILED_TO_PRODUCE_COMMON_TAGS + .and(resourceTags) + .and(eventTypeTag); + Metrics.eventDispatchLatency(tags) + .register(meterRegistry) + .record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + + logger.warn( + "Failed to produce record {}", keyValue( - TracingConfig.TRACE_ID_KEY, - span.getSpanContext().getTraceId())); - } else { - logger.debug("Received event {}", keyValue("event", record.value())); - } - } - - // Decorate the span with event specific attributed - TracingSpan.decorateCurrentWithEvent(record.value()); - - final var eventTypeTag = - Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); - - return publishRecord(producer, record) - .onSuccess(m -> { - requestContext - .getRequest() - .response() - .setStatusCode(RECORD_PRODUCED) - .end(); - - final var tags = RECORD_PRODUCED_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags) - .register(meterRegistry) - .increment(); - }) - .onFailure(cause -> { - requestContext - .getRequest() - .response() - .setStatusCode(FAILED_TO_PRODUCE) - .end(); - - final var tags = FAILED_TO_PRODUCE_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags) - .register(meterRegistry) - .increment(); - - logger.warn( - "Failed to produce record {}", - keyValue( - "path", - requestContext.getRequest().path()), - cause); - }); - }); + "path", + requestContext.getRequest().path()), + cause); + }); }); - - /* //todo: only verify audience, if audience set (--> oidc enabled) - tokenVerifier.verify(requestContext.getRequest(), producer.getAudience()) - .onFailure(e -> { - logger.debug("Could not verify JWT: " + e.getMessage()); - requestContext - .getRequest() - .response() - .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) - .end(); - }).onSuccess(jwtClaims -> { - logger.debug("Valid JWT provided. Continuing..."); - - requestToRecordMapper.requestToRecord(requestContext.getRequest(), producer.getTopic()) - .onFailure(cause -> { - // Conversion to record failed - requestContext - .getRequest() - .response() - .setStatusCode(MAPPER_FAILED) - .end(); - - final var tags = MAPPER_FAILED_COMMON_TAGS.and(resourceTags); - Metrics.eventDispatchLatency(tags).register(meterRegistry).record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - - logger.warn( - "Failed to convert request to record {}", - keyValue("path", requestContext.getRequest().path()), - cause); - }).compose(record -> { - // Conversion to record succeeded, let's push it to Kafka - if (logger.isDebugEnabled()) { - final var span = Span.fromContextOrNull(Context.current()); - if (span != null) { - logger.debug( - "Received event {} {}", - keyValue("event", record.value()), - keyValue( - TracingConfig.TRACE_ID_KEY, - span.getSpanContext().getTraceId())); - } else { - logger.debug("Received event {}", keyValue("event", record.value())); - } - } - - // Decorate the span with event specific attributed - TracingSpan.decorateCurrentWithEvent(record.value()); - - final var eventTypeTag = - Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); - - return publishRecord(producer, record) - .onSuccess(m -> { - requestContext - .getRequest() - .response() - .setStatusCode(RECORD_PRODUCED) - .end(); - - final var tags = RECORD_PRODUCED_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - }) - .onFailure(cause -> { - requestContext - .getRequest() - .response() - .setStatusCode(FAILED_TO_PRODUCE) - .end(); - - final var tags = FAILED_TO_PRODUCE_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - - logger.warn( - "Failed to produce record {}", - keyValue( - "path", - requestContext.getRequest().path()), - cause); - }); - }); - });*/ } private static Future publishRecord( diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index 45a7512845..5a028b6654 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -105,8 +105,7 @@ public DataPlaneContract.Reference getReference() { public String getAudience() { return ""; } - }, - null); + }); verifySetStatusCodeAndTerminateResponse(statusCode, response); } @@ -144,8 +143,7 @@ public DataPlaneContract.Reference getReference() { public String getAudience() { return ""; } - }, - null); + }); verifySetStatusCodeAndTerminateResponse(IngressRequestHandlerImpl.MAPPER_FAILED, response); } From 187959d11489247b03fe2952889628c01c7648ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Mon, 5 Feb 2024 16:35:42 +0100 Subject: [PATCH 03/14] Fix "Request has already been read" issue --- .../knative/eventing/kafka/broker/core/oidc/TokenVerifier.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java index bd4461e19b..f7eedc60c4 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java @@ -71,6 +71,7 @@ public Future verify(final HttpServerRequest request, String expected String token = authHeader.substring("Bearer ".length()); - return verify(token, expectedAudience); + request.pause(); + return verify(token, expectedAudience).onSuccess(v -> request.resume()); } } From 325b7c0fb706fbd749585d23ebcd282f25877340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 08:04:35 +0100 Subject: [PATCH 04/14] Change TokenVerifier to an interface --- .../kafka/broker/core/oidc/TokenVerifier.java | 58 +------------- .../broker/core/oidc/TokenVerifierImpl.java | 77 +++++++++++++++++++ .../receiver/impl/ReceiverVerticle.java | 3 +- 3 files changed, 82 insertions(+), 56 deletions(-) create mode 100644 data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java index f7eedc60c4..960c1261c0 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java @@ -16,62 +16,10 @@ package dev.knative.eventing.kafka.broker.core.oidc; import io.vertx.core.Future; -import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerRequest; import org.jose4j.jwt.JwtClaims; -import org.jose4j.jwt.consumer.InvalidJwtException; -import org.jose4j.jwt.consumer.JwtConsumer; -import org.jose4j.jwt.consumer.JwtConsumerBuilder; -import org.jose4j.jwt.consumer.JwtContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class TokenVerifier { - - private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class); - - private final Vertx vertx; - - private final OIDCDiscoveryConfig oidcDiscoveryConfig; - - public TokenVerifier(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { - this.vertx = vertx; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; - } - - public Future verify(String token, String expectedAudience) { - return this.vertx.executeBlocking(promise -> { - // execute blocking, as jose .process() is blocking - - JwtConsumer jwtConsumer = new JwtConsumerBuilder() - .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) - .setExpectedAudience(expectedAudience) - .setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer()) - .build(); - - try { - JwtContext jwtContext = jwtConsumer.process(token); - - promise.complete(jwtContext.getJwtClaims()); - } catch (InvalidJwtException e) { - promise.fail(e); - } - }); - } - - public Future verify(final HttpServerRequest request, String expectedAudience) { - String authHeader = request.getHeader("Authorization"); - if (authHeader == null || authHeader.isEmpty()) { - return Future.failedFuture("Request didn't contain Authorization header"); // change to exception - } - - if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { - return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception - } - - String token = authHeader.substring("Bearer ".length()); - - request.pause(); - return verify(token, expectedAudience).onSuccess(v -> request.resume()); - } +public interface TokenVerifier { + Future verify(String token, String expectedAudience); + Future verify(HttpServerRequest request, String expectedAudience); } diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java new file mode 100644 index 0000000000..0346be1ae6 --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.oidc; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TokenVerifierImpl implements TokenVerifier { + + private static final Logger logger = LoggerFactory.getLogger(TokenVerifierImpl.class); + + private final Vertx vertx; + + private final OIDCDiscoveryConfig oidcDiscoveryConfig; + + public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { + this.vertx = vertx; + this.oidcDiscoveryConfig = oidcDiscoveryConfig; + } + + public Future verify(String token, String expectedAudience) { + return this.vertx.executeBlocking(promise -> { + // execute blocking, as jose .process() is blocking + + JwtConsumer jwtConsumer = new JwtConsumerBuilder() + .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) + .setExpectedAudience(expectedAudience) + .setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer()) + .build(); + + try { + JwtContext jwtContext = jwtConsumer.process(token); + + promise.complete(jwtContext.getJwtClaims()); + } catch (InvalidJwtException e) { + promise.fail(e); + } + }); + } + + public Future verify(final HttpServerRequest request, String expectedAudience) { + String authHeader = request.getHeader("Authorization"); + if (authHeader == null || authHeader.isEmpty()) { + return Future.failedFuture("Request didn't contain Authorization header"); // change to exception + } + + if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { + return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception + } + + String token = authHeader.substring("Bearer ".length()); + + request.pause(); + return verify(token, expectedAudience).onSuccess(v -> request.resume()); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index c810e4cf1c..04418914b7 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -24,6 +24,7 @@ import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; @@ -149,7 +150,7 @@ public void start(final Promise startPromise) { Promise oidcPromise = Promise.promise(); OIDCDiscoveryConfig.build(vertx) .onSuccess(oidcDiscoveryConfig -> { - TokenVerifier tokenVerifier = new TokenVerifier(vertx, oidcDiscoveryConfig); + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); this.authenticationHandler = new AuthenticationHandler(tokenVerifier); logger.debug("Authenticationhandler configured"); oidcPromise.complete(); From 42c1a07920b8ab320c7c42b55d9a012b45489e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 11:41:20 +0100 Subject: [PATCH 05/14] Initialize TokenVerifier in main --- .../kafka/broker/receiverloom/Main.java | 3 ++- .../kafka/broker/receiververtx/Main.java | 3 ++- .../receiver/impl/ReceiverVerticle.java | 25 +++++-------------- .../kafka/broker/receiver/main/Main.java | 13 ++++++++-- .../main/ReceiverVerticleFactory.java | 22 +++++++++------- .../receiver/impl/ReceiverVerticleTest.java | 3 ++- .../impl/ReceiverVerticleTracingTest.java | 3 ++- .../main/ReceiverVerticleFactoryTest.java | 4 ++- .../broker/tests/AbstractDataPlaneTest.java | 3 ++- 9 files changed, 43 insertions(+), 36 deletions(-) diff --git a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java index ae325bfc90..b218b3767f 100644 --- a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java +++ b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java @@ -16,9 +16,10 @@ package dev.knative.eventing.kafka.broker.receiverloom; import java.io.IOException; +import java.util.concurrent.ExecutionException; public class Main { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new LoomProducerFactory<>()); } } diff --git a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java index 1c0dce4f4f..5365e96414 100644 --- a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java +++ b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java @@ -16,9 +16,10 @@ package dev.knative.eventing.kafka.broker.receiververtx; import java.io.IOException; +import java.util.concurrent.ExecutionException; public class Main { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new VertxProducerFactory<>()); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 04418914b7..eefba3ce9b 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -87,12 +87,12 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler ingressProducerStoreFactory; private final IngressRequestHandler ingressRequestHandler; private final ReceiverEnv env; + private final AuthenticationHandler authenticationHandler; private HttpServer httpServer; private HttpServer httpsServer; private MessageConsumer messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; - private AuthenticationHandler authenticationHandler; private FileWatcher secretWatcher; public ReceiverVerticle( @@ -101,7 +101,8 @@ public ReceiverVerticle( final HttpServerOptions httpsServerOptions, final Function ingressProducerStoreFactory, final IngressRequestHandler ingressRequestHandler, - final String secretVolumePath) { + final String secretVolumePath, + final TokenVerifier tokenVerifier) { Objects.requireNonNull(env); Objects.requireNonNull(httpServerOptions); @@ -118,6 +119,7 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); } public HttpServerOptions getHttpsServerOptions() { @@ -147,20 +149,9 @@ public void start(final Promise startPromise) { } } - Promise oidcPromise = Promise.promise(); - OIDCDiscoveryConfig.build(vertx) - .onSuccess(oidcDiscoveryConfig -> { - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); - this.authenticationHandler = new AuthenticationHandler(tokenVerifier); - logger.debug("Authenticationhandler configured"); - oidcPromise.complete(); - }) - .onFailure(oidcPromise::fail); - final var handler = new ProbeHandler( env.getLivenessProbePath(), env.getReadinessProbePath(), new MethodNotAllowedHandler(this)); - Promise httpServerPromise = Promise.promise(); if (this.httpsServer != null) { CompositeFuture.all( this.httpServer @@ -172,20 +163,16 @@ public void start(final Promise startPromise) { .exceptionHandler(startPromise::tryFail) .listen(this.httpsServerOptions.getPort(), this.httpsServerOptions.getHost())) .mapEmpty() - .onComplete(httpServerPromise); + .onComplete(startPromise); } else { this.httpServer .requestHandler(handler) .exceptionHandler(startPromise::tryFail) .listen(this.httpServerOptions.getPort(), this.httpServerOptions.getHost()) .mapEmpty() - .onComplete(httpServerPromise); + .onComplete(startPromise); } - Future.all(oidcPromise.future(), httpServerPromise.future()) - .mapEmpty() - .onComplete(startPromise); - setupSecretWatcher(); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 8638931f45..653a1f5c06 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -22,6 +22,9 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; @@ -39,6 +42,7 @@ import java.io.File; import java.io.IOException; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.kafka.clients.producer.ProducerConfig; @@ -60,7 +64,7 @@ public class Main { * @param args command line arguments. */ public static void start(final String[] args, final ReactiveProducerFactory kafkaProducerFactory) - throws IOException { + throws IOException, ExecutionException, InterruptedException { ReceiverEnv env = new ReceiverEnv(System::getenv); OpenTelemetrySdk openTelemetry = @@ -97,6 +101,10 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setPort(env.getIngressTLSPort()); httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); + // Setup TokenVerifier + OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx).toCompletionStage().toCompletableFuture().get(); + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); + // Configure the verticle to deploy and the deployment options final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( env, @@ -104,7 +112,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk Metrics.getRegistry(), httpServerOptions, httpsServerOptions, - kafkaProducerFactory); + kafkaProducerFactory, + tokenVerifier); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 1647694589..a7fa968063 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.receiver.main; import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -26,6 +27,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Verticle; import io.vertx.core.http.HttpServerOptions; + import java.util.Properties; import java.util.function.Supplier; @@ -39,17 +41,18 @@ class ReceiverVerticleFactory implements Supplier { private final String secretVolumePath = "/etc/receiver-tls-secret"; private final IngressRequestHandler ingressRequestHandler; + private final TokenVerifier tokenVerifier; private ReactiveProducerFactory kafkaProducerFactory; ReceiverVerticleFactory( - final ReceiverEnv env, - final Properties producerConfigs, - final MeterRegistry metricsRegistry, - final HttpServerOptions httpServerOptions, - final HttpServerOptions httpsServerOptions, - final ReactiveProducerFactory kafkaProducerFactory) { - { + final ReceiverEnv env, + final Properties producerConfigs, + final MeterRegistry metricsRegistry, + final HttpServerOptions httpServerOptions, + final HttpServerOptions httpsServerOptions, + final ReactiveProducerFactory kafkaProducerFactory, + final TokenVerifier tokenVerifier) { this.env = env; this.producerConfigs = producerConfigs; this.httpServerOptions = httpServerOptions; @@ -57,7 +60,7 @@ class ReceiverVerticleFactory implements Supplier { this.ingressRequestHandler = new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); this.kafkaProducerFactory = kafkaProducerFactory; - } + this.tokenVerifier = tokenVerifier; } @Override @@ -71,6 +74,7 @@ public Verticle get() { producerConfigs, properties -> kafkaProducerFactory.create(v, properties)), this.ingressRequestHandler, - secretVolumePath); + secretVolumePath, + tokenVerifier); } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index eb37cfecca..0d1684ed5f 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -149,7 +149,8 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) { httpsServerOptions, v -> store, new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), registry), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow())); // Connect to the logger in ReceiverVerticle diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index 950d3bd314..bcc41cdde8 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -132,7 +132,8 @@ public void setup() throws ExecutionException, InterruptedException { httpsServerOptions, v -> store, new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry()), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); vertx.deployVerticle(verticle).toCompletionStage().toCompletableFuture().get(); } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index d18cad6157..93d23e51f9 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.http.HttpServerOptions; @@ -42,7 +43,8 @@ public void shouldCreateMultipleReceiverVerticleInstances() { mock(MeterRegistry.class), mock(HttpServerOptions.class), mock(HttpServerOptions.class), - mock(MockReactiveProducerFactory.class)); + mock(MockReactiveProducerFactory.class), + mock(TokenVerifier.class)); assertThat(supplier.get()).isNotSameAs(supplier.get()); } diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index e8d1841e7c..02ce2b8b43 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -392,7 +392,8 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext AuthProvider.noAuth(), producerConfigs(), properties -> getReactiveProducerFactory() .create(v, properties)), new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry()), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); final CountDownLatch latch = new CountDownLatch(1); vertx.deployVerticle(verticle, context.succeeding(h -> latch.countDown())); From 6dbc239f188b5a0d8bf196bcb974794927c4254f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 11:41:33 +0100 Subject: [PATCH 06/14] Add test for AuthenticationHandler --- .../handler/AuthenticationHandlerTest.java | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java new file mode 100644 index 0000000000..330319a936 --- /dev/null +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java @@ -0,0 +1,143 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; +import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; +import io.cloudevents.CloudEvent; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.http.impl.headers.HeadersMultiMap; +import org.jose4j.jwt.JwtClaims; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + +public class AuthenticationHandlerTest { + @Test + public void shouldReturnUnauthorizedWhenJWTValidationFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var response = mockResponse(request, HttpResponseStatus.UNAUTHORIZED.code()); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + mock(Handler.class)); + + verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); + verify(response, times(1)).end(); + } + + @Test + public void shouldContinueWithRequestWhenJWTSucceedsFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var next = mock(Handler.class);//mockHandler(request); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + next); + + verify(next, times(1)).handle(request); + } + + private static HttpServerResponse mockResponse(final HttpServerRequest request, final int statusCode) { + final var response = mock(HttpServerResponse.class); + when(response.setStatusCode(statusCode)).thenReturn(response); + when(request.response()).thenReturn(response); + + return response; + } +} From 5cc362a507b0d9df48f4033c457e665b78d608e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 11:49:14 +0100 Subject: [PATCH 07/14] Only initialize OIDC discovery config in main and create a TokenVerifier per verticle instance. --- .../kafka/broker/receiver/impl/ReceiverVerticle.java | 4 +++- .../eventing/kafka/broker/receiver/main/Main.java | 5 ++--- .../broker/receiver/main/ReceiverVerticleFactory.java | 9 +++++---- .../receiver/main/ReceiverVerticleFactoryTest.java | 3 ++- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index eefba3ce9b..81267e723f 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -102,7 +102,7 @@ public ReceiverVerticle( final Function ingressProducerStoreFactory, final IngressRequestHandler ingressRequestHandler, final String secretVolumePath, - final TokenVerifier tokenVerifier) { + final OIDCDiscoveryConfig oidcDiscoveryConfig) { Objects.requireNonNull(env); Objects.requireNonNull(httpServerOptions); @@ -119,6 +119,8 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); + + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); this.authenticationHandler = new AuthenticationHandler(tokenVerifier); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 653a1f5c06..f22eda483f 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -101,9 +101,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setPort(env.getIngressTLSPort()); httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); - // Setup TokenVerifier + // Setup OIDC discovery config OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx).toCompletionStage().toCompletableFuture().get(); - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); // Configure the verticle to deploy and the deployment options final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( @@ -113,7 +112,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpServerOptions, httpsServerOptions, kafkaProducerFactory, - tokenVerifier); + oidcDiscoveryConfig); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index a7fa968063..c3233cf484 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.receiver.main; import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; @@ -41,7 +42,7 @@ class ReceiverVerticleFactory implements Supplier { private final String secretVolumePath = "/etc/receiver-tls-secret"; private final IngressRequestHandler ingressRequestHandler; - private final TokenVerifier tokenVerifier; + private final OIDCDiscoveryConfig oidcDiscoveryConfig; private ReactiveProducerFactory kafkaProducerFactory; @@ -52,7 +53,7 @@ class ReceiverVerticleFactory implements Supplier { final HttpServerOptions httpServerOptions, final HttpServerOptions httpsServerOptions, final ReactiveProducerFactory kafkaProducerFactory, - final TokenVerifier tokenVerifier) { + final OIDCDiscoveryConfig oidcDiscoveryConfig) { this.env = env; this.producerConfigs = producerConfigs; this.httpServerOptions = httpServerOptions; @@ -60,7 +61,7 @@ class ReceiverVerticleFactory implements Supplier { this.ingressRequestHandler = new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); this.kafkaProducerFactory = kafkaProducerFactory; - this.tokenVerifier = tokenVerifier; + this.oidcDiscoveryConfig = oidcDiscoveryConfig; } @Override @@ -75,6 +76,6 @@ public Verticle get() { properties -> kafkaProducerFactory.create(v, properties)), this.ingressRequestHandler, secretVolumePath, - tokenVerifier); + oidcDiscoveryConfig); } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index 93d23e51f9..d418ca0486 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.micrometer.core.instrument.MeterRegistry; @@ -44,7 +45,7 @@ public void shouldCreateMultipleReceiverVerticleInstances() { mock(HttpServerOptions.class), mock(HttpServerOptions.class), mock(MockReactiveProducerFactory.class), - mock(TokenVerifier.class)); + mock(OIDCDiscoveryConfig.class)); assertThat(supplier.get()).isNotSameAs(supplier.get()); } From cf8292c9adee5b02de38e7779ce0d9fdc5a55570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 13:10:49 +0100 Subject: [PATCH 08/14] Rerun hack/update-codegen.sh --- .../kafka/broker/core/oidc/TokenVerifier.java | 1 + .../receiver/impl/ReceiverVerticle.java | 6 +- .../impl/handler/AuthenticationHandler.java | 36 +-- .../kafka/broker/receiver/main/Main.java | 9 +- .../main/ReceiverVerticleFactory.java | 32 ++- .../handler/AuthenticationHandlerTest.java | 215 +++++++++--------- .../IngressRequestHandlerImplTest.java | 88 ++++--- .../main/ReceiverVerticleFactoryTest.java | 1 - 8 files changed, 190 insertions(+), 198 deletions(-) diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java index 960c1261c0..521b4f4514 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java @@ -21,5 +21,6 @@ public interface TokenVerifier { Future verify(String token, String expectedAudience); + Future verify(HttpServerRequest request, String expectedAudience); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 81267e723f..662fe545f7 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -230,9 +230,9 @@ public void handle(final HttpServerRequest request) { } this.authenticationHandler.handle(request, producer, req -> { - // Invoke the ingress request handler - final var requestContext = new RequestContext(req); - this.ingressRequestHandler.handle(requestContext, producer); + // Invoke the ingress request handler + final var requestContext = new RequestContext(req); + this.ingressRequestHandler.handle(requestContext, producer); }); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java index 9a34cee569..a1283ac275 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java @@ -15,6 +15,8 @@ */ package dev.knative.eventing.kafka.broker.receiver.impl.handler; +import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; + import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import io.netty.handler.codec.http.HttpResponseStatus; @@ -23,8 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; - /** * Handler checking that the provided request contained a valid JWT. */ @@ -37,23 +37,25 @@ public AuthenticationHandler(final TokenVerifier tokenVerifier) { this.tokenVerifier = tokenVerifier; } - public void handle(final HttpServerRequest request, final IngressProducer ingressInfo, final Handler next) { + public void handle( + final HttpServerRequest request, final IngressProducer ingressInfo, final Handler next) { if (ingressInfo.getAudience().isEmpty()) { - logger.debug("No audience for ingress set. Continue without authentication check..."); - next.handle(request); - return; + logger.debug("No audience for ingress set. Continue without authentication check..."); + next.handle(request); + return; } - tokenVerifier.verify(request, ingressInfo.getAudience()) - .onFailure(e -> { - logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); - request - .response() - .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) - .end(); - }).onSuccess(jwtClaims -> { - logger.debug("Request contained valid JWT. Continuing..."); - next.handle(request); - }); + tokenVerifier + .verify(request, ingressInfo.getAudience()) + .onFailure(e -> { + logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); + request.response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + }) + .onSuccess(jwtClaims -> { + logger.debug("Request contained valid JWT. Continuing..."); + next.handle(request); + }); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index f22eda483f..af04178fc8 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -23,8 +23,6 @@ import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; @@ -64,7 +62,7 @@ public class Main { * @param args command line arguments. */ public static void start(final String[] args, final ReactiveProducerFactory kafkaProducerFactory) - throws IOException, ExecutionException, InterruptedException { + throws IOException, ExecutionException, InterruptedException { ReceiverEnv env = new ReceiverEnv(System::getenv); OpenTelemetrySdk openTelemetry = @@ -102,7 +100,10 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); // Setup OIDC discovery config - OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx).toCompletionStage().toCompletableFuture().get(); + OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx) + .toCompletionStage() + .toCompletableFuture() + .get(); // Configure the verticle to deploy and the deployment options final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index c3233cf484..72438f31ae 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -17,7 +17,6 @@ import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -28,7 +27,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Verticle; import io.vertx.core.http.HttpServerOptions; - import java.util.Properties; import java.util.function.Supplier; @@ -47,21 +45,21 @@ class ReceiverVerticleFactory implements Supplier { private ReactiveProducerFactory kafkaProducerFactory; ReceiverVerticleFactory( - final ReceiverEnv env, - final Properties producerConfigs, - final MeterRegistry metricsRegistry, - final HttpServerOptions httpServerOptions, - final HttpServerOptions httpsServerOptions, - final ReactiveProducerFactory kafkaProducerFactory, - final OIDCDiscoveryConfig oidcDiscoveryConfig) { - this.env = env; - this.producerConfigs = producerConfigs; - this.httpServerOptions = httpServerOptions; - this.httpsServerOptions = httpsServerOptions; - this.ingressRequestHandler = - new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); - this.kafkaProducerFactory = kafkaProducerFactory; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; + final ReceiverEnv env, + final Properties producerConfigs, + final MeterRegistry metricsRegistry, + final HttpServerOptions httpServerOptions, + final HttpServerOptions httpsServerOptions, + final ReactiveProducerFactory kafkaProducerFactory, + final OIDCDiscoveryConfig oidcDiscoveryConfig) { + this.env = env; + this.producerConfigs = producerConfigs; + this.httpServerOptions = httpServerOptions; + this.httpsServerOptions = httpsServerOptions; + this.ingressRequestHandler = + new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); + this.kafkaProducerFactory = kafkaProducerFactory; + this.oidcDiscoveryConfig = oidcDiscoveryConfig; } @Override diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java index 330319a936..0d781106f3 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java @@ -16,128 +16,123 @@ package dev.knative.eventing.kafka.broker.receiver.impl.handler; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; -import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; -import dev.knative.eventing.kafka.broker.receiver.RequestContext; -import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; import io.cloudevents.CloudEvent; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.http.impl.headers.HeadersMultiMap; import org.jose4j.jwt.JwtClaims; import org.junit.jupiter.api.Test; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.when; - public class AuthenticationHandlerTest { - @Test - public void shouldReturnUnauthorizedWhenJWTValidationFails() { - final HttpServerRequest request = mock(HttpServerRequest.class); - final var response = mockResponse(request, HttpResponseStatus.UNAUTHORIZED.code()); - - TokenVerifier tokenVerifier = new TokenVerifier() { - @Override - public Future verify(String token, String expectedAudience) { - return Future.failedFuture("JWT validation failed"); - } - - @Override - public Future verify(HttpServerRequest request, String expectedAudience) { - return Future.failedFuture("JWT validation failed"); - } - }; - - final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); - - authHandler.handle( - request, - new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return null; - } - - @Override - public String getTopic() { - return null; - } - - @Override - public DataPlaneContract.Reference getReference() { - return null; - } - - @Override - public String getAudience() { - return "some-required-audience"; - } - }, - mock(Handler.class)); - - verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); - verify(response, times(1)).end(); - } - - @Test - public void shouldContinueWithRequestWhenJWTSucceedsFails() { - final HttpServerRequest request = mock(HttpServerRequest.class); - final var next = mock(Handler.class);//mockHandler(request); - - TokenVerifier tokenVerifier = new TokenVerifier() { - @Override - public Future verify(String token, String expectedAudience) { - return Future.succeededFuture(new JwtClaims()); - } - - @Override - public Future verify(HttpServerRequest request, String expectedAudience) { - return Future.succeededFuture(new JwtClaims()); - } - }; - - final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); - - authHandler.handle( - request, - new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return null; - } - - @Override - public String getTopic() { - return null; - } - - @Override - public DataPlaneContract.Reference getReference() { - return null; - } - - @Override - public String getAudience() { - return "some-required-audience"; - } - }, - next); - - verify(next, times(1)).handle(request); - } - - private static HttpServerResponse mockResponse(final HttpServerRequest request, final int statusCode) { - final var response = mock(HttpServerResponse.class); - when(response.setStatusCode(statusCode)).thenReturn(response); - when(request.response()).thenReturn(response); - - return response; - } + @Test + public void shouldReturnUnauthorizedWhenJWTValidationFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var response = mockResponse(request, HttpResponseStatus.UNAUTHORIZED.code()); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + mock(Handler.class)); + + verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); + verify(response, times(1)).end(); + } + + @Test + public void shouldContinueWithRequestWhenJWTSucceedsFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var next = mock(Handler.class); // mockHandler(request); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + next); + + verify(next, times(1)).handle(request); + } + + private static HttpServerResponse mockResponse(final HttpServerRequest request, final int statusCode) { + final var response = mock(HttpServerResponse.class); + when(response.setStatusCode(statusCode)).thenReturn(response); + when(request.response()).thenReturn(response); + + return response; + } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index 5a028b6654..462326bc71 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -83,29 +83,27 @@ final var record = new ProducerRecord<>("topic", 10, "key", CoreObjects.event()) final var handler = new IngressRequestHandlerImpl(mapper, Metrics.getRegistry()); - handler.handle( - new RequestContext(request), - new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return producer; - } - - @Override - public String getTopic() { - return "1-12345"; - } - - @Override - public DataPlaneContract.Reference getReference() { - return DataPlaneContract.Reference.newBuilder().build(); - } - - @Override - public String getAudience() { - return ""; - } - }); + handler.handle(new RequestContext(request), new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return producer; + } + + @Override + public String getTopic() { + return "1-12345"; + } + + @Override + public DataPlaneContract.Reference getReference() { + return DataPlaneContract.Reference.newBuilder().build(); + } + + @Override + public String getAudience() { + return ""; + } + }); verifySetStatusCodeAndTerminateResponse(statusCode, response); } @@ -121,29 +119,27 @@ public void shouldReturnBadRequestIfNoRecordCanBeCreated() { final var handler = new IngressRequestHandlerImpl(mapper, Metrics.getRegistry()); - handler.handle( - new RequestContext(request), - new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return producer; - } - - @Override - public String getTopic() { - return "1-12345"; - } - - @Override - public DataPlaneContract.Reference getReference() { - return DataPlaneContract.Reference.newBuilder().build(); - } - - @Override - public String getAudience() { - return ""; - } - }); + handler.handle(new RequestContext(request), new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return producer; + } + + @Override + public String getTopic() { + return "1-12345"; + } + + @Override + public DataPlaneContract.Reference getReference() { + return DataPlaneContract.Reference.newBuilder().build(); + } + + @Override + public String getAudience() { + return ""; + } + }); verifySetStatusCodeAndTerminateResponse(IngressRequestHandlerImpl.MAPPER_FAILED, response); } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index d418ca0486..970ca0d0f0 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -21,7 +21,6 @@ import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; -import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.http.HttpServerOptions; From 8c9494c2c3babd4c959eed79a637b0a2f3d9a275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 13:48:40 +0100 Subject: [PATCH 09/14] Move TokenVerifier setup into setup() to prevent null pointer exception when vertx is null --- .../kafka/broker/receiver/impl/ReceiverVerticle.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 662fe545f7..24e3415a7a 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -87,8 +87,9 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler ingressProducerStoreFactory; private final IngressRequestHandler ingressRequestHandler; private final ReceiverEnv env; - private final AuthenticationHandler authenticationHandler; + private final OIDCDiscoveryConfig oidcDiscoveryConfig; + private AuthenticationHandler authenticationHandler; private HttpServer httpServer; private HttpServer httpsServer; private MessageConsumer messageConsumer; @@ -119,9 +120,7 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); - - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); - this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + this.oidcDiscoveryConfig = oidcDiscoveryConfig; } public HttpServerOptions getHttpsServerOptions() { @@ -135,6 +134,9 @@ public void start(final Promise startPromise) { .watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler)) .buildAndListen(vertx); + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + this.httpServer = vertx.createHttpServer(this.httpServerOptions); // check whether the secret volume is mounted From 47196e793907391dfd15ec47026811abce4c03dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 13:49:56 +0100 Subject: [PATCH 10/14] Update KafkaChannel OIDC e2e tests, to run OIDC conformance tests so the receiver is tested too. --- test/e2e_new_channel/kafka_channel_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/e2e_new_channel/kafka_channel_test.go b/test/e2e_new_channel/kafka_channel_test.go index 5471791b66..9546d75f1b 100644 --- a/test/e2e_new_channel/kafka_channel_test.go +++ b/test/e2e_new_channel/kafka_channel_test.go @@ -28,6 +28,7 @@ import ( "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" @@ -83,13 +84,13 @@ func TestKafkaChannelOIDC(t *testing.T) { knative.WithLoggingConfig, knative.WithTracingConfig, k8s.WithEventListener, - environment.WithPollTimings(3*time.Second, 120*time.Second), + environment.WithPollTimings(2*time.Second, 12*time.Minute), environment.Managed(t), + eventshub.WithTLS(t), ) name := feature.MakeRandomK8sName("kafkaChannel") env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) - env.Test(ctx, t, oidc.AddressableHasAudiencePopulated(kafkachannelresource.GVR(), kafkachannelresource.GVK().Kind, name, env.Namespace())) - // when the KafkaChannel supports all the OIDC features, we can do `TestKafkaChannelOIDC = rekt.TestChannelImplSupportsOIDC` too + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkachannelresource.GVR(), kafkachannelresource.GVK().Kind, name, env.Namespace())) } From 99d732503f89d364d77179116c22a6bf554d1f25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 13:51:10 +0100 Subject: [PATCH 11/14] Run OIDC e2e tests as part of the reconciler suite --- test/channel-reconciler-tests.sh | 6 ++++ test/config-oidc-authentication/features.yaml | 31 +++++++++++++++++++ test/reconciler-tests.sh | 6 ++++ 3 files changed, 43 insertions(+) create mode 100644 test/config-oidc-authentication/features.yaml diff --git a/test/channel-reconciler-tests.sh b/test/channel-reconciler-tests.sh index 03a1942a78..46f4b4421d 100755 --- a/test/channel-reconciler-tests.sh +++ b/test/channel-reconciler-tests.sh @@ -3,3 +3,9 @@ source $(dirname $0)/e2e-common.sh go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail_test "E2E (new - KafkaChannel) suite failed" + +echo "Running E2E Channel Reconciler Tests with OIDC authentication enabled" + +kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" + +go_test_e2e -timeout=1h ./test/e2e_new_channel/... -run OIDC || fail_test \ No newline at end of file diff --git a/test/config-oidc-authentication/features.yaml b/test/config-oidc-authentication/features.yaml new file mode 100644 index 0000000000..ae227d1072 --- /dev/null +++ b/test/config-oidc-authentication/features.yaml @@ -0,0 +1,31 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-features + namespace: knative-eventing + labels: + knative.dev/config-propagation: original + knative.dev/config-category: eventing +data: + kreference-group: "disabled" + delivery-retryafter: "disabled" + delivery-timeout: "enabled" + kreference-mapping: "disabled" + new-trigger-filters: "enabled" + transport-encryption: "strict" + eventtype-auto-create: "disabled" + authentication-oidc: "enabled" diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index 57fbab16ef..c94b04ff17 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -54,6 +54,12 @@ kubectl apply -Rf "$(dirname "$0")/config-transport-encryption" go_test_e2e -timeout=1h ./test/e2e_new -run TLS || fail_test +echo "Running E2E Reconciler Tests with OIDC authentication enabled" + +kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" + +go_test_e2e -timeout=1h ./test/e2e_new -run OIDC || fail_test + if ! ${LOCAL_DEVELOPMENT}; then go_test_e2e -tags=sacura -timeout=40m ./test/e2e/... || fail_test "E2E (sacura) suite failed" fi From a6b05c6664f91d0f417814ef37fae2225cfcff76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 13:56:33 +0100 Subject: [PATCH 12/14] Fix KafkaChannelOIDC e2e test --- test/e2e_new_channel/kafka_channel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e_new_channel/kafka_channel_test.go b/test/e2e_new_channel/kafka_channel_test.go index 9546d75f1b..9a93f46f83 100644 --- a/test/e2e_new_channel/kafka_channel_test.go +++ b/test/e2e_new_channel/kafka_channel_test.go @@ -92,5 +92,5 @@ func TestKafkaChannelOIDC(t *testing.T) { name := feature.MakeRandomK8sName("kafkaChannel") env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) - env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkachannelresource.GVR(), kafkachannelresource.GVK().Kind, name, env.Namespace())) + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkachannelresource.GVR(), "KafkaChannel", name, env.Namespace())) } From 503cee9c2532c7fe0a5c32343d1d1ebf64b56278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 6 Feb 2024 14:03:52 +0100 Subject: [PATCH 13/14] Fix lint issue --- test/channel-reconciler-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/channel-reconciler-tests.sh b/test/channel-reconciler-tests.sh index 46f4b4421d..2bd58ca226 100755 --- a/test/channel-reconciler-tests.sh +++ b/test/channel-reconciler-tests.sh @@ -8,4 +8,4 @@ echo "Running E2E Channel Reconciler Tests with OIDC authentication enabled" kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" -go_test_e2e -timeout=1h ./test/e2e_new_channel/... -run OIDC || fail_test \ No newline at end of file +go_test_e2e -timeout=1h ./test/e2e_new_channel/... -run OIDC || fail_test From 9d223c5b6043f7b19fac48b5920b574d5d96eccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 7 Feb 2024 22:28:09 +0100 Subject: [PATCH 14/14] Address review comments --- .../eventing/kafka/broker/core/oidc/TokenVerifierImpl.java | 4 ++-- .../receiver/impl/handler/AuthenticationHandlerTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java index 0346be1ae6..d490128f9e 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java @@ -62,11 +62,11 @@ public Future verify(String token, String expectedAudience) { public Future verify(final HttpServerRequest request, String expectedAudience) { String authHeader = request.getHeader("Authorization"); if (authHeader == null || authHeader.isEmpty()) { - return Future.failedFuture("Request didn't contain Authorization header"); // change to exception + return Future.failedFuture("Request didn't contain Authorization header"); } if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { - return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception + return Future.failedFuture("Authorization header didn't contain Bearer token"); } String token = authHeader.substring("Bearer ".length()); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java index 0d781106f3..494f9b3814 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java @@ -82,7 +82,7 @@ public String getAudience() { } @Test - public void shouldContinueWithRequestWhenJWTSucceedsFails() { + public void shouldContinueWithRequestWhenJWTSucceeds() { final HttpServerRequest request = mock(HttpServerRequest.class); final var next = mock(Handler.class); // mockHandler(request);