Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka ce extensions for partition and offset #3464

Merged
merged 14 commits into from
Jan 24, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.cloudevents.CloudEvent;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A CloudEventMutator mutates a given CloudEvent
* A CloudEventMutator returns a new {@link CloudEvent} created by mutating the CloudEvent in a
* given {@link ConsumerRecord}.
*/
@FunctionalInterface
public interface CloudEventMutator extends Function<CloudEvent, CloudEvent> {}
public interface CloudEventMutator extends Function<ConsumerRecord<Object, CloudEvent>, CloudEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@

/**
* This class implements the core algorithm of the Dispatcher (see {@link
* RecordDispatcherImpl#dispatch(KafkaConsumerRecord)}).
* RecordDispatcherImpl#dispatch(ConsumerRecord)}).
*
* @see RecordDispatcherImpl#dispatch(KafkaConsumerRecord)
* @see RecordDispatcherImpl#dispatch(ConsumerRecord)
*/
public class RecordDispatcherImpl implements RecordDispatcher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.vertx.core.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -37,19 +38,8 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven

@Override
public Future<Void> dispatch(ConsumerRecord<Object, CloudEvent> record) {
final var newRecord = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
cloudEventMutator.apply(record.value()),
record.headers(),
record.leaderEpoch());
return next.dispatch(newRecord);
return next.dispatch(
KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* CloudEventOverridesMutator is a {@link CloudEventMutator} that applies a given set of
Expand All @@ -33,16 +34,19 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl
}

@Override
public CloudEvent apply(CloudEvent cloudEvent) {
if (cloudEventOverrides.getExtensionsMap().isEmpty()) {
return cloudEvent;
}
final var builder = CloudEventBuilder.from(cloudEvent);
public CloudEvent apply(ConsumerRecord<Object, CloudEvent> record) {
final var builder = CloudEventBuilder.from(record.value());
applyKafkaMetadata(builder, record.partition(), record.offset());
applyCloudEventOverrides(builder);
return builder.build();
}

private void applyCloudEventOverrides(CloudEventBuilder builder) {
cloudEventOverrides.getExtensionsMap().forEach(builder::withExtension);
}

private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) {
builder.withExtension("knativekafkapartition", partition);
builder.withExtension("knativekafkaoffset", offset);
Comment on lines +49 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make those private static final String ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the prefix of knative... but wondering if the actual upstream/spec, would want to say something on the metadata in https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see anything in particular while reading the docs there, I'll ask at the serverless WG meeting tomorrow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From today's cloudevents call, we should use our own attribute names prefixed by knative and then document those in our repo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fair and thanks for raising this

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

public class CloudEventOverridesMutatorTest {
Expand All @@ -47,14 +48,16 @@ public void shouldAddExtensions() {

final var expected = CloudEventBuilder.from(given);
extensions.forEach(expected::withExtension);
expected.withExtension("knativekafkaoffset", 1L);
expected.withExtension("knativekafkapartition", 1);

final var got = mutator.apply(given);
final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isEqualTo(expected.build());
}

@Test
public void shouldNotMutateRecordWhenNoOverrides() {
public void shouldAddKafkaExtensionsWhenNoOverrides() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(Map.of())
.build();
Expand All @@ -68,8 +71,13 @@ public void shouldNotMutateRecordWhenNoOverrides() {
.withType("foo")
.build();

final var got = mutator.apply(given);
final var expected = CloudEventBuilder.from(given)
.withExtension("knativekafkaoffset", 1L)
.withExtension("knativekafkapartition", 1)
.build();

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isSameAs(given);
assertThat(got).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.contract;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
Expand Down Expand Up @@ -111,7 +112,9 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception {
await().atMost(6, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(vertx.deploymentIDs()).hasSize(numEgresses + NUM_SYSTEM_VERTICLES));

waitEvents.await();
if (!waitEvents.await(6, TimeUnit.SECONDS)) {
fail("failed to have all events process properly in time");
}

final var producers = consumerVerticleFactoryMock.producers();
final var consumers = consumerVerticleFactoryMock.consumers();
Expand Down Expand Up @@ -165,7 +168,10 @@ private static void startServer(
logger.info("received event {}", event);

context.verify(() -> {
assertThat(receivedEvent).isEqualTo(event);
assertThat(receivedEvent.getType()).isEqualTo(event.getType());
assertThat(receivedEvent.getSubject()).isEqualTo(event.getSubject());
assertThat(receivedEvent.getSource()).isEqualTo(event.getSource());
assertThat(receivedEvent.getId()).isEqualTo(event.getId());

VertxMessageFactory.createWriter(request.response())
.writeBinary(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerDeployerVerticle;
import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleFactoryImpl;
import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore;
Expand Down Expand Up @@ -246,7 +248,10 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In
if (request.path().equals(PATH_SERVICE_1)) {
final var expectedEvent = service1ExpectedEventsIterator.next();
context.verify(() -> {
assertThat(event).isEqualTo(expectedEvent);
assertThat(event.getId()).isEqualTo(expectedEvent.getId());
assertThat(event.getType()).isEqualTo(expectedEvent.getType());
assertThat(event.getSubject()).isEqualTo(expectedEvent.getSubject());
assertThat(event.getSource()).isEqualTo(expectedEvent.getSource());
checkpoints.flag(); // 2
});

Expand All @@ -260,7 +265,11 @@ public void execute(final Vertx vertx, final VertxTestContext context) throws In
// service 2 receives event in the response
if (request.path().equals(PATH_SERVICE_2)) {
context.verify(() -> {
assertThat(event).isEqualTo(expectedResponseEventService2);
assertThat(event.getId()).isEqualTo(expectedResponseEventService2.getId());
assertThat(event.getType()).isEqualTo(expectedResponseEventService2.getType());
assertThat(event.getSubject())
.isEqualTo(expectedResponseEventService2.getSubject());
assertThat(event.getSource()).isEqualTo(expectedResponseEventService2.getSource());
checkpoints.flag(); // 3
});

Expand Down Expand Up @@ -337,6 +346,9 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
consumerConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
consumerConfigs.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

final var producerConfigs = producerConfigs();

Expand Down
15 changes: 15 additions & 0 deletions test/e2e_new/broker_event_transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package e2e_new
import (
"testing"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/eventing/test/rekt/features/broker"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
Expand All @@ -42,3 +43,17 @@ func TestEventTransformationForTrigger(t *testing.T) {

env.TestSet(ctx, t, broker.BrokerWorkFlowWithTransformation())
}

func TestKnativeKafkaCloudEventExtensionsAdded(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.TestSet(ctx, t, features.KnativeKafkaCEExtensionsAdded())
}
Loading
Loading