diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java index 31182eb126..f9062e65a5 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java @@ -22,7 +22,6 @@ import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; @@ -36,7 +35,7 @@ public class EventTypeCreatorImpl implements EventTypeCreator { private final MixedOperation, Resource> eventTypeClient; - private final Lister eventTypeLister; + private final EventTypeListerFactory eventTypeListerFactory; private MessageDigest messageDigest; @@ -44,14 +43,11 @@ public class EventTypeCreatorImpl implements EventTypeCreator { public EventTypeCreatorImpl( MixedOperation, Resource> eventTypeClient, - Lister eventTypeLister, + EventTypeListerFactory eventTypeListerFactory, Vertx vertx) throws IllegalArgumentException, NoSuchAlgorithmException { this.eventTypeClient = eventTypeClient; - if (eventTypeLister == null) { - throw new IllegalArgumentException("eventTypeLister must be non null"); - } - this.eventTypeLister = eventTypeLister; + this.eventTypeListerFactory = eventTypeListerFactory; this.executor = vertx.createSharedWorkerExecutor("et-creator-worker", 1); this.messageDigest = MessageDigest.getInstance("MD5"); } @@ -69,7 +65,9 @@ private String getName(CloudEvent event, DataPlaneContract.Reference reference) } private EventType eventTypeExists(String etName, DataPlaneContract.Reference reference) { - return this.eventTypeLister.namespace(reference.getNamespace()).get(etName); + return this.eventTypeListerFactory + .getForNamespace(reference.getNamespace()) + .get(etName); } @Override diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeListerFactory.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeListerFactory.java new file mode 100644 index 0000000000..2742ac070f --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeListerFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.knative.eventing.kafka.broker.core.eventtype; + +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.fabric8.kubernetes.client.informers.cache.Lister; +import java.util.HashMap; +import java.util.Map; + +public class EventTypeListerFactory { + private final Map> listerMap; + private final SharedIndexInformer eventTypeInformer; + + public EventTypeListerFactory(SharedIndexInformer eventTypeInformer) { + if (eventTypeInformer == null) { + throw new IllegalArgumentException("you must provide a non null eventtype informer"); + } + this.eventTypeInformer = eventTypeInformer; + this.listerMap = new HashMap<>(); + } + + public Lister getForNamespace(String namespace) { + if (this.listerMap.containsKey(namespace)) { + return this.listerMap.get(namespace); + } + return this.createListerForNamespace(namespace); + } + + private Lister createListerForNamespace(String namespace) { + final var lister = new Lister<>(this.eventTypeInformer.getIndexer(), namespace); + this.listerMap.put(namespace, lister); + return lister; + } +} diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImplTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImplTest.java index 5884048aca..0fc97d348c 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImplTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImplTest.java @@ -21,7 +21,6 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.vertx.core.Vertx; @@ -47,8 +46,7 @@ public class EventTypeCreatorImplTest { public void testCreate(Vertx vertx, VertxTestContext vertxTestContext) throws NoSuchAlgorithmException { final var eventTypeClient = kubernetesClient.resources(EventType.class); final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L); - final var eventTypeLister = new Lister<>(informer.getIndexer()); - var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx); + var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx); var event = new CloudEventBuilder() .withType("example.event.type") .withSource(URI.create("/example/source")) @@ -100,8 +98,7 @@ public void testCreatesOnlyOnce(Vertx vertx, VertxTestContext vertxTestContext) final var eventTypeClient = kubernetesClient.resources(EventType.class); final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L); informer.run(); - final var eventTypeLister = new Lister<>(informer.getIndexer()); - var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx); + var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx); var event = new CloudEventBuilder() .withType("example.event.type") .withSource(URI.create("/example/source")) diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index ba17c90f91..999028d28b 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -34,7 +34,6 @@ import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.vertx.core.DeploymentOptions; import io.vertx.core.Verticle; @@ -135,7 +134,6 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk final SharedInformerFactory sharedInformerFactory = kubernetesClient.informers(); final var eventTypeClient = kubernetesClient.resources(EventType.class); SharedIndexInformer eventTypeInformer = null; - Lister eventTypeLister = null; try { eventTypeInformer = sharedInformerFactory.sharedIndexInformerFor( EventType.class, 30 * 1000L); // refresh every 30 seconds @@ -148,9 +146,6 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk "the data-plane does not have sufficient permissions to list/watch eventtypes. This will lead to unnecessary CREATE requests if eventtype-auto-create is enabled", informerException); } - if (eventTypeInformer != null) { - eventTypeLister = new Lister<>(eventTypeInformer.getIndexer()); - } // Configure the verticle to deploy and the deployment options @@ -163,7 +158,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions, kafkaProducerFactory, eventTypeClient, - eventTypeLister, + eventTypeInformer, vertx, oidcDiscoveryConfig); DeploymentOptions deploymentOptions = diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 83bc881525..9cb8b26518 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -18,6 +18,7 @@ import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeCreatorImpl; +import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; @@ -29,7 +30,7 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; -import io.fabric8.kubernetes.client.informers.cache.Lister; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Verticle; import io.vertx.core.Vertx; @@ -60,7 +61,7 @@ class ReceiverVerticleFactory implements Supplier { final HttpServerOptions httpsServerOptions, final ReactiveProducerFactory kafkaProducerFactory, final MixedOperation, Resource> eventTypeClient, - final Lister eventTypeLister, + final SharedIndexInformer eventTypeInformer, Vertx vertx, final OIDCDiscoveryConfig oidcDiscoveryConfig) throws NoSuchAlgorithmException { @@ -72,7 +73,7 @@ class ReceiverVerticleFactory implements Supplier { this.ingressRequestHandler = new IngressRequestHandlerImpl( StrictRequestToRecordMapper.getInstance(), metricsRegistry, - new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx)); + new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(eventTypeInformer), vertx)); this.kafkaProducerFactory = kafkaProducerFactory; this.oidcDiscoveryConfig = oidcDiscoveryConfig; } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index cbf7f7b3e5..6c6fb276a6 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -24,7 +24,7 @@ import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.informers.cache.Lister; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerOptions; @@ -54,7 +54,7 @@ public void shouldCreateMultipleReceiverVerticleInstances(Vertx vertx) throws No mock(HttpServerOptions.class), mock(MockReactiveProducerFactory.class), mockClient.resources(EventType.class), - mock(Lister.class), + mock(SharedIndexInformer.class), vertx, mock(OIDCDiscoveryConfig.class));