diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker.go b/control-plane/pkg/reconciler/broker/namespaced_broker.go index bf21296621..759c13e94f 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker.go @@ -386,6 +386,7 @@ func (r *NamespacedReconciler) configMapsFromSystemNamespace(broker *eventing.Br configMaps := []string{ "config-kafka-broker-data-plane", "config-tracing", + "config-features", "kafka-config-logging", } resources := make([]unstructured.Unstructured, 0, len(configMaps)) diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index 23601f2cc6..62a0942476 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -109,6 +109,7 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env) DataPlaneConfigInitialOffset(ConsumerConfigKey, sources.OffsetLatest), ), reconcilertesting.NewConfigMap("config-tracing", SystemNamespace), + reconcilertesting.NewConfigMap("config-features", SystemNamespace), reconcilertesting.NewConfigMap("kafka-config-logging", SystemNamespace), NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), NewService(), @@ -181,6 +182,14 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env) WithNamespacedBrokerOwnerRef, WithNamespacedLabel, ), + ToManifestivalResource(t, + reconcilertesting.NewConfigMap( + "config-features", + BrokerNamespace, + ), + WithNamespacedBrokerOwnerRef, + WithNamespacedLabel, + ), ToManifestivalResource(t, reconcilertesting.NewConfigMap( "kafka-config-logging", @@ -360,6 +369,7 @@ func namespacedBrokerFinalization(t *testing.T, format string, env config.Env) { }, env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat), reconcilertesting.NewConfigMap(env.DataPlaneConfigConfigMapName, SystemNamespace), reconcilertesting.NewConfigMap("config-tracing", SystemNamespace), + reconcilertesting.NewConfigMap("config-features", SystemNamespace), reconcilertesting.NewConfigMap("kafka-config-logging", SystemNamespace), reconcilertesting.NewDeployment("kafka-broker-receiver", SystemNamespace), reconcilertesting.NewDeployment("kafka-broker-dispatcher", SystemNamespace), diff --git a/data-plane/config/broker/500-receiver.yaml b/data-plane/config/broker/500-receiver.yaml index fee514b3e6..46dadb5535 100644 --- a/data-plane/config/broker/500-receiver.yaml +++ b/data-plane/config/broker/500-receiver.yaml @@ -76,6 +76,9 @@ spec: - mountPath: /etc/tracing name: config-tracing readOnly: true + - mountPath: /etc/features + name: config-features + readOnly: true - mountPath: /etc/receiver-tls-secret name: broker-receiver-tls-secret readOnly: true @@ -120,6 +123,8 @@ spec: value: "false" - name: CONFIG_TRACING_PATH value: "/etc/tracing" + - name: CONFIG_FEATURES_PATH + value: "/etc/features" # https://github.com/fabric8io/kubernetes-client/issues/2212 - name: HTTP2_DISABLE value: "true" @@ -177,6 +182,9 @@ spec: - name: config-tracing configMap: name: config-tracing + - name: config-features + configMap: + name: config-features - name: broker-receiver-tls-secret secret: secretName: kafka-broker-ingress-server-tls diff --git a/data-plane/config/channel/500-receiver.yaml b/data-plane/config/channel/500-receiver.yaml index bb3d9c83d4..949d40a612 100644 --- a/data-plane/config/channel/500-receiver.yaml +++ b/data-plane/config/channel/500-receiver.yaml @@ -76,6 +76,9 @@ spec: - mountPath: /etc/tracing name: config-tracing readOnly: true + - mountPath: /etc/features + name: config-features + readOnly: true - mountPath: /etc/receiver-tls-secret name: channel-receiver-tls-secret readOnly: true @@ -120,6 +123,8 @@ spec: value: "false" - name: CONFIG_TRACING_PATH value: "/etc/tracing" + - name: CONFIG_FEATURES_PATH + value: "/etc/features" # https://github.com/fabric8io/kubernetes-client/issues/2212 - name: HTTP2_DISABLE value: "true" @@ -177,6 +182,9 @@ spec: - name: config-tracing configMap: name: config-tracing + - name: config-features + configMap: + name: config-features - name: channel-receiver-tls-secret secret: secretName: kafka-channel-ingress-server-tls diff --git a/data-plane/config/sink/500-receiver.yaml b/data-plane/config/sink/500-receiver.yaml index b4b46485c4..6a7ed65d0b 100644 --- a/data-plane/config/sink/500-receiver.yaml +++ b/data-plane/config/sink/500-receiver.yaml @@ -76,6 +76,9 @@ spec: - mountPath: /etc/tracing name: config-tracing readOnly: true + - mountPath: /etc/features + name: config-features + readOnly: true - mountPath: /etc/receiver-tls-secret name: sink-receiver-tls-secret readOnly: true @@ -120,6 +123,8 @@ spec: value: "false" - name: CONFIG_TRACING_PATH value: "/etc/tracing" + - name: CONFIG_FEATURES_PATH + value: "/etc/features" # https://github.com/fabric8io/kubernetes-client/issues/2212 - name: HTTP2_DISABLE value: "true" @@ -177,6 +182,9 @@ spec: - name: config-tracing configMap: name: config-tracing + - name: config-features + configMap: + name: config-features - name: sink-receiver-tls-secret secret: secretName: kafka-sink-ingress-server-tls diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/features/FeaturesConfig.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/features/FeaturesConfig.java new file mode 100644 index 0000000000..939d3fc945 --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/features/FeaturesConfig.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.features; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +public class FeaturesConfig { + + private final String DISABLED = "disabled"; + private final String ENABLED = "enabled"; + + public static final String KEY_AUTHENTICATION_OIDC = "authentication-oidc"; + + private final Map features; + + public FeaturesConfig(String path) throws IOException { + features = new HashMap<>(); + String[] keys = { + KEY_AUTHENTICATION_OIDC, + }; + + for (String key : keys) { + Path filePath = Paths.get(path, key); + if (Files.exists(filePath)) { + features.put(key, Files.readString(filePath)); + } + } + } + + public boolean isAuthenticationOIDC() { + return isEnabled(KEY_AUTHENTICATION_OIDC); + } + + private boolean isEnabled(String key) { + return features.getOrDefault(key, DISABLED).toLowerCase().equals(ENABLED); + } +} diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java index 16c97d594f..b5312e0bbb 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java @@ -15,6 +15,7 @@ */ package dev.knative.eventing.kafka.broker.core.oidc; +import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerRequest; @@ -44,6 +45,13 @@ public Future verify(String token, String expectedAudience) { promise -> { // execute blocking, as jose .process() is blocking + if (oidcDiscoveryConfig == null) { + promise.fail( + "OIDC discovery config not initialized. This is most likely the case when the pod was started with an invalid OIDC config in place and then later the " + + FeaturesConfig.KEY_AUTHENTICATION_OIDC + + " flag was enabled. Restarting the pod should help."); + } + JwtConsumer jwtConsumer = new JwtConsumerBuilder() .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) .setExpectedAudience(expectedAudience) diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/utils/BaseEnv.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/utils/BaseEnv.java index 5b3743876b..9353006166 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/utils/BaseEnv.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/utils/BaseEnv.java @@ -48,6 +48,9 @@ public class BaseEnv { public static final String CONFIG_TRACING_PATH = "CONFIG_TRACING_PATH"; private final String configTracingPath; + public static final String CONFIG_FEATURES_PATH = "CONFIG_FEATURES_PATH"; + private final String configFeaturesPath; + public static final String WAIT_STARTUP_SECONDS = "WAIT_STARTUP_SECONDS"; private final int waitStartupSeconds; @@ -61,6 +64,7 @@ public BaseEnv(Function envProvider) { this.producerConfigFilePath = requireNonNull(envProvider.apply(PRODUCER_CONFIG_FILE_PATH)); this.dataPlaneConfigFilePath = requireNonNull(envProvider.apply(DATA_PLANE_CONFIG_FILE_PATH)); this.configTracingPath = requireNonNull(envProvider.apply(CONFIG_TRACING_PATH)); + this.configFeaturesPath = envProvider.apply(CONFIG_FEATURES_PATH); this.waitStartupSeconds = Integer.parseInt(envProvider.apply(WAIT_STARTUP_SECONDS)); } @@ -100,6 +104,10 @@ public String getConfigTracingPath() { return configTracingPath; } + public String getConfigFeaturesPath() { + return configFeaturesPath; + } + public int getWaitStartupSeconds() { return waitStartupSeconds; } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index aaf4e50dc5..e449368737 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -21,6 +21,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec; import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; +import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; @@ -76,6 +77,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk OpenTelemetrySdk openTelemetry = TracingConfig.fromDir(env.getConfigTracingPath()).setup(); + FeaturesConfig featuresConfig = new FeaturesConfig(env.getConfigFeaturesPath()); + // Read producer properties and override some defaults Properties producerConfigs = Configurations.readPropertiesSync(env.getProducerConfigFilePath()); producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -108,10 +111,22 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); // Setup OIDC discovery config - OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx) - .toCompletionStage() - .toCompletableFuture() - .get(); + OIDCDiscoveryConfig oidcDiscoveryConfig = null; + try { + oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx) + .toCompletionStage() + .toCompletableFuture() + .get(); + } catch (ExecutionException ex) { + if (featuresConfig.isAuthenticationOIDC()) { + logger.error("Could not load OIDC config while OIDC authentication feature is enabled."); + throw ex; + } else { + logger.warn( + "Could not load OIDC configuration. This will lead to problems, when the {} flag will be enabled later", + FeaturesConfig.KEY_AUTHENTICATION_OIDC); + } + } final var kubernetesClient = new KubernetesClientBuilder().build(); final SharedInformerFactory sharedInformerFactory = kubernetesClient.informers();