Skip to content

Commit

Permalink
Add kafka ce extensions for partition and offset (#3464)
Browse files Browse the repository at this point in the history
* Added kafka ce extensions for partition and offset

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Updated codegen

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test vendor fix

Signed-off-by: Calum Murray <cmurray@redhat.com>

* remove unnecessary null event deserialization

Signed-off-by: Calum Murray <cmurray@redhat.com>

* add e2e tests for broker, channel, sink

Signed-off-by: Calum Murray <cmurray@redhat.com>

* remove unused deserializer

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix failing unit test that ran forever

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix seconds constant

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: fix java dataplane test ce comparisons

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: take 2 on dataplane java test comparison fix

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: add kafka interceptors to kafka consumerconfigs in tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix build problems in ce_extensions rekt tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jan 24, 2024
1 parent 5ccc150 commit e2bd457
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.cloudevents.CloudEvent;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A CloudEventMutator mutates a given CloudEvent
* A CloudEventMutator returns a new {@link CloudEvent} created by mutating the CloudEvent in a
* given {@link ConsumerRecord}.
*/
@FunctionalInterface
public interface CloudEventMutator extends Function<CloudEvent, CloudEvent> {}
public interface CloudEventMutator extends Function<ConsumerRecord<Object, CloudEvent>, CloudEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@

/**
* This class implements the core algorithm of the Dispatcher (see {@link
* RecordDispatcherImpl#dispatch(KafkaConsumerRecord)}).
* RecordDispatcherImpl#dispatch(ConsumerRecord)}).
*
* @see RecordDispatcherImpl#dispatch(KafkaConsumerRecord)
* @see RecordDispatcherImpl#dispatch(ConsumerRecord)
*/
public class RecordDispatcherImpl implements RecordDispatcher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.vertx.core.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -37,19 +38,8 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven

@Override
public Future<Void> dispatch(ConsumerRecord<Object, CloudEvent> record) {
final var newRecord = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
cloudEventMutator.apply(record.value()),
record.headers(),
record.leaderEpoch());
return next.dispatch(newRecord);
return next.dispatch(
KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* CloudEventOverridesMutator is a {@link CloudEventMutator} that applies a given set of
Expand All @@ -33,16 +34,19 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl
}

@Override
public CloudEvent apply(CloudEvent cloudEvent) {
if (cloudEventOverrides.getExtensionsMap().isEmpty()) {
return cloudEvent;
}
final var builder = CloudEventBuilder.from(cloudEvent);
public CloudEvent apply(ConsumerRecord<Object, CloudEvent> record) {
final var builder = CloudEventBuilder.from(record.value());
applyKafkaMetadata(builder, record.partition(), record.offset());
applyCloudEventOverrides(builder);
return builder.build();
}

private void applyCloudEventOverrides(CloudEventBuilder builder) {
cloudEventOverrides.getExtensionsMap().forEach(builder::withExtension);
}

private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) {
builder.withExtension("knativekafkapartition", partition);
builder.withExtension("knativekafkaoffset", offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

public class CloudEventOverridesMutatorTest {
Expand All @@ -47,14 +48,16 @@ public void shouldAddExtensions() {

final var expected = CloudEventBuilder.from(given);
extensions.forEach(expected::withExtension);
expected.withExtension("knativekafkaoffset", 1L);
expected.withExtension("knativekafkapartition", 1);

final var got = mutator.apply(given);
final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isEqualTo(expected.build());
}

@Test
public void shouldNotMutateRecordWhenNoOverrides() {
public void shouldAddKafkaExtensionsWhenNoOverrides() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(Map.of())
.build();
Expand All @@ -68,8 +71,13 @@ public void shouldNotMutateRecordWhenNoOverrides() {
.withType("foo")
.build();

final var got = mutator.apply(given);
final var expected = CloudEventBuilder.from(given)
.withExtension("knativekafkaoffset", 1L)
.withExtension("knativekafkapartition", 1)
.build();

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isSameAs(given);
assertThat(got).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.contract;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
Expand Down Expand Up @@ -111,7 +112,9 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception {
await().atMost(6, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(vertx.deploymentIDs()).hasSize(numEgresses + NUM_SYSTEM_VERTICLES));

waitEvents.await();
if (!waitEvents.await(6, TimeUnit.SECONDS)) {
fail("failed to have all events process properly in time");
}

final var producers = consumerVerticleFactoryMock.producers();
final var consumers = consumerVerticleFactoryMock.consumers();
Expand Down Expand Up @@ -165,7 +168,10 @@ private static void startServer(
logger.info("received event {}", event);

context.verify(() -> {
assertThat(receivedEvent).isEqualTo(event);
assertThat(receivedEvent.getType()).isEqualTo(event.getType());
assertThat(receivedEvent.getSubject()).isEqualTo(event.getSubject());
assertThat(receivedEvent.getSource()).isEqualTo(event.getSource());
assertThat(receivedEvent.getId()).isEqualTo(event.getId());

VertxMessageFactory.createWriter(request.response())
.writeBinary(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerDeployerVerticle;
import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleFactoryImpl;
import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore;
Expand Down Expand Up @@ -246,7 +248,10 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In
if (request.path().equals(PATH_SERVICE_1)) {
final var expectedEvent = service1ExpectedEventsIterator.next();
context.verify(() -> {
assertThat(event).isEqualTo(expectedEvent);
assertThat(event.getId()).isEqualTo(expectedEvent.getId());
assertThat(event.getType()).isEqualTo(expectedEvent.getType());
assertThat(event.getSubject()).isEqualTo(expectedEvent.getSubject());
assertThat(event.getSource()).isEqualTo(expectedEvent.getSource());
checkpoints.flag(); // 2
});

Expand All @@ -260,7 +265,11 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In
// service 2 receives event in the response
if (request.path().equals(PATH_SERVICE_2)) {
context.verify(() -> {
assertThat(event).isEqualTo(expectedResponseEventService2);
assertThat(event.getId()).isEqualTo(expectedResponseEventService2.getId());
assertThat(event.getType()).isEqualTo(expectedResponseEventService2.getType());
assertThat(event.getSubject())
.isEqualTo(expectedResponseEventService2.getSubject());
assertThat(event.getSource()).isEqualTo(expectedResponseEventService2.getSource());
checkpoints.flag(); // 3
});

Expand Down Expand Up @@ -337,6 +346,9 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
consumerConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
consumerConfigs.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

final var producerConfigs = producerConfigs();

Expand Down
15 changes: 15 additions & 0 deletions test/e2e_new/broker_event_transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package e2e_new
import (
"testing"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/eventing/test/rekt/features/broker"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
Expand All @@ -42,3 +43,17 @@ func TestEventTransformationForTrigger(t *testing.T) {

env.TestSet(ctx, t, broker.BrokerWorkFlowWithTransformation())
}

func TestKnativeKafkaCloudEventExtensionsAdded(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.TestSet(ctx, t, features.KnativeKafkaCEExtensionsAdded())
}
Loading

0 comments on commit e2bd457

Please sign in to comment.