From 838127222b7b2b01141ca713f35f404f5dbc67c4 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 21 Nov 2023 14:38:12 -0500 Subject: [PATCH 01/12] Added kafka ce extensions for partition and offset Signed-off-by: Calum Murray --- .../broker/dispatcher/CloudEventMutator.java | 7 +++-- .../dispatcher/impl/RecordDispatcherImpl.java | 4 +-- .../impl/RecordDispatcherMutatorChain.java | 17 ++--------- .../consumer/CloudEventOverridesMutator.java | 30 ++++++++++++++++--- .../CloudEventOverridesMutatorTest.java | 29 +++++++++++++++--- 5 files changed, 61 insertions(+), 26 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java index 438dab1820..c3435bb48a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java @@ -16,10 +16,13 @@ package dev.knative.eventing.kafka.broker.dispatcher; import io.cloudevents.CloudEvent; +import org.apache.kafka.clients.consumer.ConsumerRecord; + import java.util.function.Function; /** - * A CloudEventMutator mutates a given CloudEvent. + * A CloudEventMutator returns a new {@link CloudEvent} created by mutating the CloudEvent in a + * given {@link ConsumerRecord}. */ @FunctionalInterface -public interface CloudEventMutator extends Function {} +public interface CloudEventMutator extends Function, CloudEvent> {} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index b62604d0e9..46222ac4e5 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -54,9 +54,9 @@ /** * This class implements the core algorithm of the Dispatcher (see {@link - * RecordDispatcherImpl#dispatch(KafkaConsumerRecord)}). + * RecordDispatcherImpl#dispatch(ConsumerRecord)}). * - * @see RecordDispatcherImpl#dispatch(KafkaConsumerRecord) + * @see RecordDispatcherImpl#dispatch(ConsumerRecord) */ public class RecordDispatcherImpl implements RecordDispatcher { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java index 6313fcc0a4..5c3e7f6ad6 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java @@ -17,13 +17,14 @@ import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils; import io.cloudevents.CloudEvent; import io.vertx.core.Future; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * {@link RecordDispatcherMutatorChain} chains {@link RecordDispatcher}s and applies mutations using a provided - * {@link CloudEventMutator} before passing the {@link KafkaConsumerRecord} to the next {@link RecordDispatcher}. + * {@link CloudEventMutator} before passing the {@link ConsumerRecord} to the next {@link RecordDispatcher}. */ public class RecordDispatcherMutatorChain implements RecordDispatcher { @@ -37,19 +38,7 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven @Override public Future dispatch(ConsumerRecord record) { - final var newRecord = new ConsumerRecord<>( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - record.serializedKeySize(), - record.serializedValueSize(), - record.key(), - cloudEventMutator.apply(record.value()), - record.headers(), - record.leaderEpoch()); - return next.dispatch(newRecord); + return next.dispatch(KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record))); } @Override diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index 6456f66430..3440ac481f 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -19,6 +19,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** * CloudEventOverridesMutator is a {@link CloudEventMutator} that applies a given set of @@ -28,21 +29,42 @@ public class CloudEventOverridesMutator implements CloudEventMutator { private final DataPlaneContract.CloudEventOverrides cloudEventOverrides; + private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) { this.cloudEventOverrides = cloudEventOverrides; } @Override - public CloudEvent apply(CloudEvent cloudEvent) { - if (cloudEventOverrides.getExtensionsMap().isEmpty()) { - return cloudEvent; - } + public CloudEvent apply(ConsumerRecord record) { + final var cloudEvent = this.maybeDeserializeFromHeaders(record); final var builder = CloudEventBuilder.from(cloudEvent); + applyKafkaMetadata(builder, record.partition(), record.offset()); applyCloudEventOverrides(builder); return builder.build(); } + private CloudEvent maybeDeserializeFromHeaders(ConsumerRecord record) { + if (record.value() != null) { + return record.value(); + } + // A valid CloudEvent in the CE binary protocol binding of Kafka + // might be composed by only Headers. + // + // KafkaConsumer doesn't call the deserializer if the value + // is null. + // + // That means that we get a record with a null value and some CE + // headers even though the record is a valid CloudEvent. + return cloudEventDeserializer.deserialize(record.topic(), record.headers(), null); + } + private void applyCloudEventOverrides(CloudEventBuilder builder) { cloudEventOverrides.getExtensionsMap().forEach(builder::withExtension); } + + private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) { + builder.withExtension("knativekafkapartition", partition); + builder.withExtension("knativekafkaoffset", offset); + } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java index 6c587f6d2c..7ca594e813 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java @@ -23,6 +23,8 @@ import java.time.OffsetDateTime; import java.util.Map; import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; public class CloudEventOverridesMutatorTest { @@ -47,14 +49,22 @@ public void shouldAddExtensions() { final var expected = CloudEventBuilder.from(given); extensions.forEach(expected::withExtension); + expected.withExtension("knativekafkaoffset", 1L); + expected.withExtension("knativekafkapartition", 1); - final var got = mutator.apply(given); + final var got = mutator.apply(new ConsumerRecord<>( + "test-topic", + 1, + 1, + "key", + given + )); assertThat(got).isEqualTo(expected.build()); } @Test - public void shouldNotMutateRecordWhenNoOverrides() { + public void shouldAddKafkaExtensionsWhenNoOverrides() { final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder() .putAllExtensions(Map.of()) .build(); @@ -68,8 +78,19 @@ public void shouldNotMutateRecordWhenNoOverrides() { .withType("foo") .build(); - final var got = mutator.apply(given); + final var expected = CloudEventBuilder.from(given) + .withExtension("knativekafkaoffset", 1L) + .withExtension("knativekafkapartition", 1) + .build(); + + final var got = mutator.apply(new ConsumerRecord<>( + "test-topic", + 1, + 1, + "key", + given + )); - assertThat(got).isSameAs(given); + assertThat(got).isEqualTo(expected); } } From 1d43b912adb81228bbd62452da6b78c16f4a7f05 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 21 Nov 2023 14:51:54 -0500 Subject: [PATCH 02/12] Updated codegen Signed-off-by: Calum Murray --- .../broker/dispatcher/CloudEventMutator.java | 3 +- .../impl/RecordDispatcherMutatorChain.java | 3 +- .../consumer/CloudEventOverridesMutator.java | 30 +++++++++---------- .../CloudEventOverridesMutatorTest.java | 23 ++++---------- 4 files changed, 23 insertions(+), 36 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java index c3435bb48a..4eccb7f5d8 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java @@ -16,9 +16,8 @@ package dev.knative.eventing.kafka.broker.dispatcher; import io.cloudevents.CloudEvent; -import org.apache.kafka.clients.consumer.ConsumerRecord; - import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** * A CloudEventMutator returns a new {@link CloudEvent} created by mutating the CloudEvent in a diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java index 5c3e7f6ad6..cb818cffeb 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java @@ -38,7 +38,8 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven @Override public Future dispatch(ConsumerRecord record) { - return next.dispatch(KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record))); + return next.dispatch( + KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record))); } @Override diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index 3440ac481f..51820f6c8d 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -29,7 +29,7 @@ public class CloudEventOverridesMutator implements CloudEventMutator { private final DataPlaneContract.CloudEventOverrides cloudEventOverrides; - private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) { this.cloudEventOverrides = cloudEventOverrides; @@ -45,18 +45,18 @@ public CloudEvent apply(ConsumerRecord record) { } private CloudEvent maybeDeserializeFromHeaders(ConsumerRecord record) { - if (record.value() != null) { - return record.value(); - } - // A valid CloudEvent in the CE binary protocol binding of Kafka - // might be composed by only Headers. - // - // KafkaConsumer doesn't call the deserializer if the value - // is null. - // - // That means that we get a record with a null value and some CE - // headers even though the record is a valid CloudEvent. - return cloudEventDeserializer.deserialize(record.topic(), record.headers(), null); + if (record.value() != null) { + return record.value(); + } + // A valid CloudEvent in the CE binary protocol binding of Kafka + // might be composed by only Headers. + // + // KafkaConsumer doesn't call the deserializer if the value + // is null. + // + // That means that we get a record with a null value and some CE + // headers even though the record is a valid CloudEvent. + return cloudEventDeserializer.deserialize(record.topic(), record.headers(), null); } private void applyCloudEventOverrides(CloudEventBuilder builder) { @@ -64,7 +64,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) { } private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) { - builder.withExtension("knativekafkapartition", partition); - builder.withExtension("knativekafkaoffset", offset); + builder.withExtension("knativekafkapartition", partition); + builder.withExtension("knativekafkaoffset", offset); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java index 7ca594e813..a96ec0580e 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java @@ -23,7 +23,6 @@ import java.time.OffsetDateTime; import java.util.Map; import java.util.UUID; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -52,13 +51,7 @@ public void shouldAddExtensions() { expected.withExtension("knativekafkaoffset", 1L); expected.withExtension("knativekafkapartition", 1); - final var got = mutator.apply(new ConsumerRecord<>( - "test-topic", - 1, - 1, - "key", - given - )); + final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); assertThat(got).isEqualTo(expected.build()); } @@ -79,17 +72,11 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() { .build(); final var expected = CloudEventBuilder.from(given) - .withExtension("knativekafkaoffset", 1L) - .withExtension("knativekafkapartition", 1) - .build(); + .withExtension("knativekafkaoffset", 1L) + .withExtension("knativekafkapartition", 1) + .build(); - final var got = mutator.apply(new ConsumerRecord<>( - "test-topic", - 1, - 1, - "key", - given - )); + final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); assertThat(got).isEqualTo(expected); } From a347b8e87064a0932a3a21a4582f41d51f57fed3 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 22 Nov 2023 15:37:11 -0500 Subject: [PATCH 03/12] test vendor fix Signed-off-by: Calum Murray --- .../knative.dev/eventing/test/rekt/features/broker/topology.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/knative.dev/eventing/test/rekt/features/broker/topology.go b/vendor/knative.dev/eventing/test/rekt/features/broker/topology.go index aa27810473..ef3adbb8ac 100644 --- a/vendor/knative.dev/eventing/test/rekt/features/broker/topology.go +++ b/vendor/knative.dev/eventing/test/rekt/features/broker/topology.go @@ -172,7 +172,7 @@ func assertExpectedRoutedEvents(prober *eventshub.EventProber, expected map[stri if len(want) != 0 && len(got) != 0 { // ID is adjusted by eventshub. except := []cmp.Option{ - cmpopts.IgnoreFields(conformanceevent.ContextAttributes{}, "ID"), + cmpopts.IgnoreFields(conformanceevent.ContextAttributes{}, "ID", "Extensions"), cmpopts.IgnoreMapEntries(func(k, v string) bool { return k == "knativearrivaltime" }), } if diff := cmp.Diff(want, got, except...); diff != "" { From 07aa0b6e4f989ab33200bb603777244e9b45475f Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 12 Jan 2024 14:47:32 -0500 Subject: [PATCH 04/12] remove unnecessary null event deserialization Signed-off-by: Calum Murray --- .../consumer/CloudEventOverridesMutator.java | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index 51820f6c8d..dce227d6c6 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -37,28 +37,12 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl @Override public CloudEvent apply(ConsumerRecord record) { - final var cloudEvent = this.maybeDeserializeFromHeaders(record); - final var builder = CloudEventBuilder.from(cloudEvent); + final var builder = CloudEventBuilder.from(record.value()); applyKafkaMetadata(builder, record.partition(), record.offset()); applyCloudEventOverrides(builder); return builder.build(); } - private CloudEvent maybeDeserializeFromHeaders(ConsumerRecord record) { - if (record.value() != null) { - return record.value(); - } - // A valid CloudEvent in the CE binary protocol binding of Kafka - // might be composed by only Headers. - // - // KafkaConsumer doesn't call the deserializer if the value - // is null. - // - // That means that we get a record with a null value and some CE - // headers even though the record is a valid CloudEvent. - return cloudEventDeserializer.deserialize(record.topic(), record.headers(), null); - } - private void applyCloudEventOverrides(CloudEventBuilder builder) { cloudEventOverrides.getExtensionsMap().forEach(builder::withExtension); } From f11ea9e80b76e7bf4354a5df769ae94c7edc787b Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 12 Jan 2024 14:47:49 -0500 Subject: [PATCH 05/12] add e2e tests for broker, channel, sink Signed-off-by: Calum Murray --- .../broker_event_transformation_test.go | 15 ++ test/rekt/features/ce_extensions.go | 168 ++++++++++++++++++ 2 files changed, 183 insertions(+) create mode 100644 test/rekt/features/ce_extensions.go diff --git a/test/e2e_new/broker_event_transformation_test.go b/test/e2e_new/broker_event_transformation_test.go index 548ddb0fc0..5306a0993c 100644 --- a/test/e2e_new/broker_event_transformation_test.go +++ b/test/e2e_new/broker_event_transformation_test.go @@ -22,6 +22,7 @@ package e2e_new import ( "testing" + "knative.dev/eventing-kafka-broker/test/rekt/features" "knative.dev/eventing/test/rekt/features/broker" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" @@ -42,3 +43,17 @@ func TestEventTransformationForTrigger(t *testing.T) { env.TestSet(ctx, t, broker.BrokerWorkFlowWithTransformation()) } + +func TestKnativeKafkaCloudEventExtensionsAdded(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.TestSet(ctx, t, features.KnativeKafkaCEExtensionsAdded()) +} diff --git a/test/rekt/features/ce_extensions.go b/test/rekt/features/ce_extensions.go new file mode 100644 index 0000000000..84bfa08a49 --- /dev/null +++ b/test/rekt/features/ce_extensions.go @@ -0,0 +1,168 @@ +/* + * Copyright 2024 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" + testpkg "knative.dev/eventing-kafka-broker/test/pkg" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/subscription" + "knative.dev/eventing/test/rekt/resources/trigger" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/eventshub" + eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/pkg/resources/service" +) + +func KnativeKafkaCEExtensionsAdded() *feature.FeatureSet { + return &feature.FeatureSet{ + Name: "KnativeKafkaCEExtensionsAdded", + Features: []*feature.Feature{ + brokerAddsKnativeKafkaCEExtensions(), + channelAddsKnativeKafkaCEExtensions(), + sourceAddsKnativeKafkaCEExtensions(), + }, + } +} + +func brokerAddsKnativeKafkaCEExtensions() *feature.Feature { + f := feature.NewFeature() + + brokerName := feature.MakeRandomK8sName("broker") + triggerName := feature.MakeRandomK8sName("trigger") + senderName := feature.MakeRandomK8sName("sender") + sinkName := feature.MakeRandomK8sName("sink") + + inputEvent := test.FullEvent() + + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver)) + + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(service.AsKReference(sinkName), ""), + )) + + f.Requirement("install source", eventshub.Install( + senderName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(inputEvent), + )) + + f.Alpha("broker"). + Must("must add Knative Kafka CE extensions", eventasssert.OnStore(sinkName). + MatchEvent(test.ContainsExtensions("knativekafkapartition", "knativekafkaoffset")). + Exact(1)) + return f +} + +func channelAddsKnativeKafkaCEExtensions() *feature.Feature { + f := feature.NewFeature() + + channelName := feature.MakeRandomK8sName("channel") + subscriptionName := feature.MakeRandomK8sName("subscription") + senderName := feature.MakeRandomK8sName("sender") + sinkName := feature.MakeRandomK8sName("sink") + + inputEvent := test.FullEvent() + + f.Setup("install channel", kafkachannel.Install( + channelName, + kafkachannel.WithNumPartitions("3"), + kafkachannel.WithReplicationFactor("1"), + )) + f.Setup("channel is ready", kafkachannel.IsReady(channelName)) + + f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver)) + + f.Setup("install subscription", subscription.Install( + subscriptionName, + subscription.WithChannel(&duckv1.KReference{ + Kind: "KafkaChannel", + Name: channelName, + APIVersion: kafkachannel.GVR().GroupVersion().String(), + }), + subscription.WithSubscriber(service.AsKReference(sinkName), "", ""), + )) + + f.Requirement("install source", eventshub.Install( + senderName, + eventshub.StartSenderToResource(kafkachannel.GVR(), channelName), + eventshub.InputEvent(inputEvent), + )) + + f.Alpha("channel"). + Must("add Knative Kafka CE extensions", eventasssert.OnStore(sinkName). + MatchEvent(test.ContainsExtensions("knativekafkapartition", "knativekafkaoffset")). + Exact(1)) + return f +} + +func sourceAddsKnativeKafkaCEExtensions() *feature.Feature { + f := feature.NewFeature() + + kafkaSource := feature.MakeRandomK8sName("kafka-source") + topic := feature.MakeRandomK8sName("topic") + kafkaSink := feature.MakeRandomK8sName("kafkaSink") + sinkName := feature.MakeRandomK8sName("eventshub-receiver") + sender := feature.MakeRandomK8sName("eventshub-sender") + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install kafka topic", kafkatopic.Install(topic)) + f.Setup("topic is ready", kafkatopic.IsReady(topic)) + + // Binary content mode is default for Kafka Sink. + f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr)) + f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink)) + + f.Setup("install eventshub receiver", eventshub.Install(sinkName, eventshub.StartReceiver)) + + kafkaSourceOpts := []manifest.CfgFn{ + kafkasource.WithSink(service.AsKReference(sinkName), ""), + kafkasource.WithTopics([]string{topic}), + kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr), + } + + f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...)) + f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource)) + + options := []eventshub.EventsHubOption{ + eventshub.StartSenderToResource(kafkasink.GVR(), kafkaSink), + eventshub.InputEvent(event), + } + f.Requirement("install eventshub sender", eventshub.Install(sender, options...)) + + f.Alpha("source"). + Must("add Knative Kafka CE extensions", eventasssert.OnStore(sinkName). + MatchEvent(test.ContainsExtensions("knativekafkapartition", "knativekafkaoffset")). + Exact(1)) + return f +} From 642e4caf62c058ea055f58560e1018f2b6e7be60 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 15 Jan 2024 15:50:32 -0500 Subject: [PATCH 06/12] remove unused deserializer Signed-off-by: Calum Murray --- .../dispatcher/impl/consumer/CloudEventOverridesMutator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index dce227d6c6..b70fbf367b 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -29,8 +29,6 @@ public class CloudEventOverridesMutator implements CloudEventMutator { private final DataPlaneContract.CloudEventOverrides cloudEventOverrides; - private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); - public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) { this.cloudEventOverrides = cloudEventOverrides; } From 91d4d16a9f2ea92cd28241b76d8c2c45a9aaec62 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 15 Jan 2024 16:26:05 -0500 Subject: [PATCH 07/12] fix failing unit test that ran forever Signed-off-by: Calum Murray --- .../dispatcher/integration/UnorderedConsumerTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index 222ac5ef0b..c62387fa20 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -17,6 +17,7 @@ import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.contract; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; @@ -111,7 +112,9 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception { await().atMost(6, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(vertx.deploymentIDs()).hasSize(numEgresses + NUM_SYSTEM_VERTICLES)); - waitEvents.await(); + if (!waitEvents.await(6, TimeUnit.Second)) { + fail("failed to have all events process properly in time"); + } final var producers = consumerVerticleFactoryMock.producers(); final var consumers = consumerVerticleFactoryMock.consumers(); @@ -165,7 +168,10 @@ private static void startServer( logger.info("received event {}", event); context.verify(() -> { - assertThat(receivedEvent).isEqualTo(event); + assertThat(receivedEvent.getType()).isEqualTo(event.getType()); + assertThat(receivedEvent.getSubject()).isEqualTo(event.getSubject()); + assertThat(receivedEvent.getSource()).isEqualTo(event.getSource()); + assertThat(receivedEvent.getId()).isEqualTo(event.getId()); VertxMessageFactory.createWriter(request.response()) .writeBinary(event); From 033e25707c7d9728d42a4cf4dc5db4b71b87a6cd Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jan 2024 10:15:02 -0500 Subject: [PATCH 08/12] fix seconds constant Signed-off-by: Calum Murray --- .../broker/dispatcher/integration/UnorderedConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index c62387fa20..b5cb3d440a 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -112,8 +112,8 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception { await().atMost(6, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(vertx.deploymentIDs()).hasSize(numEgresses + NUM_SYSTEM_VERTICLES)); - if (!waitEvents.await(6, TimeUnit.Second)) { - fail("failed to have all events process properly in time"); + if (!waitEvents.await(6, TimeUnit.SECONDS)) { + fail("failed to have all events process properly in time"); } final var producers = consumerVerticleFactoryMock.producers(); From e3afc263002a62245d1d56b7248177cf65f97dff Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jan 2024 11:15:12 -0500 Subject: [PATCH 09/12] fix: fix java dataplane test ce comparisons Signed-off-by: Calum Murray --- .../eventing/kafka/broker/tests/AbstractDataPlaneTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 2399ca443c..711b1eca57 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -246,7 +246,8 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In if (request.path().equals(PATH_SERVICE_1)) { final var expectedEvent = service1ExpectedEventsIterator.next(); context.verify(() -> { - assertThat(event).isEqualTo(expectedEvent); + assertThat(event).usingComparatorForFields((a, b) -> 0, "extensions").isEqualTo(expectedEvent); +// assertThat(event).isEqualTo(expectedEvent); checkpoints.flag(); // 2 }); @@ -260,7 +261,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In // service 2 receives event in the response if (request.path().equals(PATH_SERVICE_2)) { context.verify(() -> { - assertThat(event).isEqualTo(expectedResponseEventService2); + assertThat(event).usingComparatorForFields((a, b) -> 0, "extensions").isEqualTo(expectedResponseEventService2); checkpoints.flag(); // 3 }); From 12a04d75e4c45783c36603a3101bd30211ad2f55 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jan 2024 11:36:19 -0500 Subject: [PATCH 10/12] fix: take 2 on dataplane java test comparison fix Signed-off-by: Calum Murray --- .../kafka/broker/tests/AbstractDataPlaneTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 711b1eca57..34b2b90232 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -246,8 +246,10 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In if (request.path().equals(PATH_SERVICE_1)) { final var expectedEvent = service1ExpectedEventsIterator.next(); context.verify(() -> { - assertThat(event).usingComparatorForFields((a, b) -> 0, "extensions").isEqualTo(expectedEvent); -// assertThat(event).isEqualTo(expectedEvent); + assertThat(event.getId()).isEqualTo(expectedEvent.getId()); + assertThat(event.getType()).isEqualTo(expectedEvent.getType()); + assertThat(event.getSubject()).isEqualTo(expectedEvent.getSubject()); + assertThat(event.getSource()).isEqualTo(expectedEvent.getSource()); checkpoints.flag(); // 2 }); @@ -261,7 +263,11 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In // service 2 receives event in the response if (request.path().equals(PATH_SERVICE_2)) { context.verify(() -> { - assertThat(event).usingComparatorForFields((a, b) -> 0, "extensions").isEqualTo(expectedResponseEventService2); + assertThat(event.getId()).isEqualTo(expectedResponseEventService2.getId()); + assertThat(event.getType()).isEqualTo(expectedResponseEventService2.getType()); + assertThat(event.getSubject()) + .isEqualTo(expectedResponseEventService2.getSubject()); + assertThat(event.getSource()).isEqualTo(expectedResponseEventService2.getSource()); checkpoints.flag(); // 3 }); From 4dba45107f179f52450170d3e40bf908f28c1363 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jan 2024 13:20:11 -0500 Subject: [PATCH 11/12] fix: add kafka interceptors to kafka consumerconfigs in tests Signed-off-by: Calum Murray --- .../eventing/kafka/broker/tests/AbstractDataPlaneTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 34b2b90232..e8d1841e7c 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -35,7 +35,9 @@ import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor; import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerDeployerVerticle; import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleFactoryImpl; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -344,6 +346,9 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT consumerConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName()); consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); consumerConfigs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName()); + consumerConfigs.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName()); final var producerConfigs = producerConfigs(); From 59220e8ec6c61f20fab6b1aeeed18c793290d0b9 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 23 Jan 2024 10:57:27 -0500 Subject: [PATCH 12/12] fix build problems in ce_extensions rekt tests Signed-off-by: Calum Murray --- test/rekt/features/ce_extensions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/rekt/features/ce_extensions.go b/test/rekt/features/ce_extensions.go index 84bfa08a49..cb67e54d0a 100644 --- a/test/rekt/features/ce_extensions.go +++ b/test/rekt/features/ce_extensions.go @@ -146,7 +146,7 @@ func sourceAddsKnativeKafkaCEExtensions() *feature.Feature { f.Setup("install eventshub receiver", eventshub.Install(sinkName, eventshub.StartReceiver)) kafkaSourceOpts := []manifest.CfgFn{ - kafkasource.WithSink(service.AsKReference(sinkName), ""), + kafkasource.WithSink(service.AsDestinationRef(sinkName)), kafkasource.WithTopics([]string{topic}), kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr), }