Skip to content

Commit

Permalink
Move to using namespaced listers for eventtypes in data plane (#3951)
Browse files Browse the repository at this point in the history
* feat: added factory for new eventtype listers

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: used factory to get namespaced listers for eventtypes

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: correctly pass lister factory to eventtytpe creator

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: add license header

Signed-off-by: Calum Murray <cmurray@redhat.com>

* ./hack/update-codegen.sh

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jun 26, 2024
1 parent 63d6859 commit 2b755b0
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
Expand All @@ -36,22 +35,19 @@ public class EventTypeCreatorImpl implements EventTypeCreator {

private final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient;

private final Lister<EventType> eventTypeLister;
private final EventTypeListerFactory eventTypeListerFactory;

private MessageDigest messageDigest;

private final WorkerExecutor executor;

public EventTypeCreatorImpl(
MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient,
Lister<EventType> eventTypeLister,
EventTypeListerFactory eventTypeListerFactory,
Vertx vertx)
throws IllegalArgumentException, NoSuchAlgorithmException {
this.eventTypeClient = eventTypeClient;
if (eventTypeLister == null) {
throw new IllegalArgumentException("eventTypeLister must be non null");
}
this.eventTypeLister = eventTypeLister;
this.eventTypeListerFactory = eventTypeListerFactory;
this.executor = vertx.createSharedWorkerExecutor("et-creator-worker", 1);
this.messageDigest = MessageDigest.getInstance("MD5");
}
Expand All @@ -69,7 +65,9 @@ private String getName(CloudEvent event, DataPlaneContract.Reference reference)
}

private EventType eventTypeExists(String etName, DataPlaneContract.Reference reference) {
return this.eventTypeLister.namespace(reference.getNamespace()).get(etName);
return this.eventTypeListerFactory
.getForNamespace(reference.getNamespace())
.get(etName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.core.eventtype;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import java.util.HashMap;
import java.util.Map;

public class EventTypeListerFactory {
private final Map<String, Lister<EventType>> listerMap;
private final SharedIndexInformer<EventType> eventTypeInformer;

public EventTypeListerFactory(SharedIndexInformer<EventType> eventTypeInformer) {
if (eventTypeInformer == null) {
throw new IllegalArgumentException("you must provide a non null eventtype informer");
}
this.eventTypeInformer = eventTypeInformer;
this.listerMap = new HashMap<>();
}

public Lister<EventType> getForNamespace(String namespace) {
if (this.listerMap.containsKey(namespace)) {
return this.listerMap.get(namespace);
}
return this.createListerForNamespace(namespace);
}

private Lister<EventType> createListerForNamespace(String namespace) {
final var lister = new Lister<>(this.eventTypeInformer.getIndexer(), namespace);
this.listerMap.put(namespace, lister);
return lister;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.vertx.core.Vertx;
Expand All @@ -47,8 +46,7 @@ public class EventTypeCreatorImplTest {
public void testCreate(Vertx vertx, VertxTestContext vertxTestContext) throws NoSuchAlgorithmException {
final var eventTypeClient = kubernetesClient.resources(EventType.class);
final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L);
final var eventTypeLister = new Lister<>(informer.getIndexer());
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx);
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx);
var event = new CloudEventBuilder()
.withType("example.event.type")
.withSource(URI.create("/example/source"))
Expand Down Expand Up @@ -100,8 +98,7 @@ public void testCreatesOnlyOnce(Vertx vertx, VertxTestContext vertxTestContext)
final var eventTypeClient = kubernetesClient.resources(EventType.class);
final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L);
informer.run();
final var eventTypeLister = new Lister<>(informer.getIndexer());
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx);
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx);
var event = new CloudEventBuilder()
.withType("example.event.type")
.withSource(URI.create("/example/source"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Verticle;
Expand Down Expand Up @@ -135,7 +134,6 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
final SharedInformerFactory sharedInformerFactory = kubernetesClient.informers();
final var eventTypeClient = kubernetesClient.resources(EventType.class);
SharedIndexInformer<EventType> eventTypeInformer = null;
Lister<EventType> eventTypeLister = null;
try {
eventTypeInformer = sharedInformerFactory.sharedIndexInformerFor(
EventType.class, 30 * 1000L); // refresh every 30 seconds
Expand All @@ -148,9 +146,6 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
"the data-plane does not have sufficient permissions to list/watch eventtypes. This will lead to unnecessary CREATE requests if eventtype-auto-create is enabled",
informerException);
}
if (eventTypeInformer != null) {
eventTypeLister = new Lister<>(eventTypeInformer.getIndexer());
}

// Configure the verticle to deploy and the deployment options

Expand All @@ -163,7 +158,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
httpsServerOptions,
kafkaProducerFactory,
eventTypeClient,
eventTypeLister,
eventTypeInformer,
vertx,
oidcDiscoveryConfig);
DeploymentOptions deploymentOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory;
import dev.knative.eventing.kafka.broker.core.eventtype.EventType;
import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeCreatorImpl;
import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler;
Expand All @@ -29,7 +30,7 @@
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -60,7 +61,7 @@ class ReceiverVerticleFactory implements Supplier<Verticle> {
final HttpServerOptions httpsServerOptions,
final ReactiveProducerFactory<String, CloudEvent> kafkaProducerFactory,
final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient,
final Lister<EventType> eventTypeLister,
final SharedIndexInformer<EventType> eventTypeInformer,
Vertx vertx,
final OIDCDiscoveryConfig oidcDiscoveryConfig)
throws NoSuchAlgorithmException {
Expand All @@ -72,7 +73,7 @@ class ReceiverVerticleFactory implements Supplier<Verticle> {
this.ingressRequestHandler = new IngressRequestHandlerImpl(
StrictRequestToRecordMapper.getInstance(),
metricsRegistry,
new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx));
new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(eventTypeInformer), vertx));
this.kafkaProducerFactory = kafkaProducerFactory;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerOptions;
Expand Down Expand Up @@ -54,7 +54,7 @@ public void shouldCreateMultipleReceiverVerticleInstances(Vertx vertx) throws No
mock(HttpServerOptions.class),
mock(MockReactiveProducerFactory.class),
mockClient.resources(EventType.class),
mock(Lister.class),
mock(SharedIndexInformer.class),
vertx,
mock(OIDCDiscoveryConfig.class));

Expand Down

0 comments on commit 2b755b0

Please sign in to comment.