Skip to content

Commit

Permalink
Add confluent json serde support
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Jan 2, 2024
1 parent 0314b10 commit 2700391
Show file tree
Hide file tree
Showing 17 changed files with 516 additions and 4 deletions.
11 changes: 10 additions & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,16 @@
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
Expand Down Expand Up @@ -3399,7 +3409,6 @@
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>${apicurio-common-rest-client.version}</version>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum Feature {
CDI,
CONFIG_YAML,
CONFLUENT_REGISTRY_AVRO,
CONFLUENT_REGISTRY_JSON,
ELASTICSEARCH_REST_CLIENT_COMMON,
ELASTICSEARCH_REST_CLIENT,
ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT,
Expand Down
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice-deployment</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.confluent.registry.json;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Predicate;

import org.jboss.logging.Logger;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem;
import io.quarkus.maven.dependency.ResolvedDependency;

public class ConfluentRegistryJsonProcessor {

public static final String CONFLUENT_GROUP_ID = "io.confluent";
public static final String CONFLUENT_ARTIFACT_ID = "kafka-json-schema-serializer";

private static final Logger LOGGER = Logger.getLogger(ConfluentRegistryJsonProcessor.class.getName());
public static final String CONFLUENT_REPO = "https://packages.confluent.io/maven/";
public static final String GUIDE_URL = "https://quarkus.io/guides/kafka-schema-registry-json-schema";

@BuildStep
FeatureBuildItem featureAndCheckDependency(CurateOutcomeBuildItem cp) {
if (findConfluentSerde(cp.getApplicationModel().getDependencies()).isEmpty()) {
LOGGER.warnf("The application uses the `quarkus-confluent-registry-json-schema` extension, but does not " +
"depend on `%s:%s`. Note that this dependency is only available from the `%s` Maven " +
"repository. Check %s for more details.",
CONFLUENT_GROUP_ID, CONFLUENT_ARTIFACT_ID, CONFLUENT_REPO, GUIDE_URL);
}

return new FeatureBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
}

@BuildStep
public void confluentRegistryJson(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
reflectiveClass
.produce(ReflectiveClassBuildItem.builder("io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer",
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer").methods().build());
}

@BuildStep
public void configureNative(BuildProducer<NativeImageConfigBuildItem> config, CurateOutcomeBuildItem cp) {
Optional<ResolvedDependency> serde = findConfluentSerde(cp.getApplicationModel().getDependencies());
if (serde.isPresent()) {
String version = serde.get().getVersion();
if (version.startsWith("7.1") || version.startsWith("7.2")) {
// Only required for Confluent Serde 7.1.x and 7.2.x
config.produce(NativeImageConfigBuildItem.builder()
.addRuntimeInitializedClass("io.confluent.kafka.schemaregistry.client.rest.utils.UrlList")
.build());
}
}
}

@BuildStep
ExtensionSslNativeSupportBuildItem enableSslInNative() {
return new ExtensionSslNativeSupportBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
}

private Optional<ResolvedDependency> findConfluentSerde(Collection<ResolvedDependency> dependencies) {
return dependencies.stream().filter(new Predicate<ResolvedDependency>() {
@Override
public boolean test(ResolvedDependency rd) {
return rd.getGroupId().equalsIgnoreCase(CONFLUENT_GROUP_ID)
&& rd.getArtifactId().equalsIgnoreCase(CONFLUENT_ARTIFACT_ID);
}
}).findAny();
}
}
21 changes: 21 additions & 0 deletions extensions/schema-registry/confluent/json-schema/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-confluent-registry-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema</name>
<packaging>pom</packaging>

<modules>
<module>deployment</module>
<module>runtime</module>
</modules>
</project>
90 changes: 90 additions & 0 deletions extensions/schema-registry/confluent/json-schema/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema - Runtime</name>
<description>Use Confluent as Json Schema schema registry</description>

<dependencies>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.5.1</version>
<exclusions>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>validation-api</artifactId>
<groupId>javax.validation</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<build>
<!-- Mark this as a runtime dependency, so to make sure it's included on the final classpath during native-image -->
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-maven-plugin</artifactId>
<configuration>
<capabilities>
<provides>io.quarkus.confluent.registry.json</provides>
</capabilities>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.quarkus.confluent.registry.json;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;

import io.confluent.kafka.schemaregistry.annotations.Schema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.SpecificationVersion;

@TargetClass(className = "io.confluent.kafka.schemaregistry.json.JsonSchemaUtils")
final class Target_io_confluent_kafka_schemaregistry_json_JsonSchemaUtils {

@Substitute
public static JsonSchema getSchema(
Object object,
SpecificationVersion specVersion,
boolean useOneofForNullables,
boolean failUnknownProperties,
ObjectMapper objectMapper,
SchemaRegistryClient client) throws IOException {

if (object == null) {
return null;
}

Class<?> cls = object.getClass();
//We only support the scenario of having the schema defined in the annotation in the java bean, since it does not rely on outdated libraries.
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(new Function<io.confluent.kafka.schemaregistry.annotations.SchemaReference, SchemaReference>() {
@Override
public SchemaReference apply(
io.confluent.kafka.schemaregistry.annotations.SchemaReference schemaReference) {
return new SchemaReference(schemaReference.name(), schemaReference.subject(),
schemaReference.version());
}
})
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(new Supplier<IOException>() {
@Override
public IOException get() {
return new IOException("Invalid schema " + schema.value()
+ " with refs " + references);
}
});
}
}
return null;
}
}

class ConfluentJsonSubstitutions {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
artifact: ${project.groupId}:${project.artifactId}:${project.version}
name: "Confluent Schema Registry - Json Schema"
metadata:
keywords:
- "confluent"
- "json-schema"
categories:
- "serialization"
status: "preview"
Loading

0 comments on commit 2700391

Please sign in to comment.