Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add confluent jsonserde support #37870

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,16 @@
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
Expand Down Expand Up @@ -3399,7 +3409,6 @@
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>${apicurio-common-rest-client.version}</version>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum Feature {
CDI,
CONFIG_YAML,
CONFLUENT_REGISTRY_AVRO,
CONFLUENT_REGISTRY_JSON,
ELASTICSEARCH_REST_CLIENT_COMMON,
ELASTICSEARCH_REST_CLIENT,
ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT,
Expand Down
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-confluent-registry-json-schema-deployment</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice-deployment</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.confluent.registry.json;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Predicate;

import org.jboss.logging.Logger;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem;
import io.quarkus.maven.dependency.ResolvedDependency;

public class ConfluentRegistryJsonProcessor {

public static final String CONFLUENT_GROUP_ID = "io.confluent";
public static final String CONFLUENT_ARTIFACT_ID = "kafka-json-schema-serializer";

private static final Logger LOGGER = Logger.getLogger(ConfluentRegistryJsonProcessor.class.getName());
public static final String CONFLUENT_REPO = "https://packages.confluent.io/maven/";
public static final String GUIDE_URL = "https://quarkus.io/guides/kafka-schema-registry-json-schema";

@BuildStep
FeatureBuildItem featureAndCheckDependency(CurateOutcomeBuildItem cp) {
if (findConfluentSerde(cp.getApplicationModel().getDependencies()).isEmpty()) {
LOGGER.warnf("The application uses the `quarkus-confluent-registry-json-schema` extension, but does not " +
"depend on `%s:%s`. Note that this dependency is only available from the `%s` Maven " +
"repository. Check %s for more details.",
CONFLUENT_GROUP_ID, CONFLUENT_ARTIFACT_ID, CONFLUENT_REPO, GUIDE_URL);
}

return new FeatureBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
}

@BuildStep
public void confluentRegistryJson(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
reflectiveClass
.produce(ReflectiveClassBuildItem.builder("io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer",
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer").methods().build());
}

@BuildStep
public void configureNative(BuildProducer<NativeImageConfigBuildItem> config, CurateOutcomeBuildItem cp) {
Optional<ResolvedDependency> serde = findConfluentSerde(cp.getApplicationModel().getDependencies());
if (serde.isPresent()) {
String version = serde.get().getVersion();
if (version.startsWith("7.1") || version.startsWith("7.2")) {
// Only required for Confluent Serde 7.1.x and 7.2.x
config.produce(NativeImageConfigBuildItem.builder()
.addRuntimeInitializedClass("io.confluent.kafka.schemaregistry.client.rest.utils.UrlList")
.build());
}
}
}

@BuildStep
ExtensionSslNativeSupportBuildItem enableSslInNative() {
return new ExtensionSslNativeSupportBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
}

private Optional<ResolvedDependency> findConfluentSerde(Collection<ResolvedDependency> dependencies) {
return dependencies.stream().filter(new Predicate<ResolvedDependency>() {
@Override
public boolean test(ResolvedDependency rd) {
return rd.getGroupId().equalsIgnoreCase(CONFLUENT_GROUP_ID)
&& rd.getArtifactId().equalsIgnoreCase(CONFLUENT_ARTIFACT_ID);
}
}).findAny();
}
}
21 changes: 21 additions & 0 deletions extensions/schema-registry/confluent/json-schema/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-confluent-registry-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema</name>
<packaging>pom</packaging>

<modules>
<module>deployment</module>
<module>runtime</module>
</modules>
</project>
90 changes: 90 additions & 0 deletions extensions/schema-registry/confluent/json-schema/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-confluent-registry-json-schema</artifactId>
<name>Quarkus - Confluent Schema Registry - Json Schema - Runtime</name>
<description>Use Confluent as Json Schema schema registry</description>

<dependencies>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.5.1</version>
<exclusions>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>validation-api</artifactId>
<groupId>javax.validation</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<build>
<!-- Mark this as a runtime dependency, so to make sure it's included on the final classpath during native-image -->
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-maven-plugin</artifactId>
<configuration>
<capabilities>
<provides>io.quarkus.confluent.registry.json</provides>
</capabilities>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.quarkus.confluent.registry.json;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;

import io.confluent.kafka.schemaregistry.annotations.Schema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.SpecificationVersion;

@TargetClass(className = "io.confluent.kafka.schemaregistry.json.JsonSchemaUtils")
final class Target_io_confluent_kafka_schemaregistry_json_JsonSchemaUtils {

@Substitute
public static JsonSchema getSchema(
Object object,
SpecificationVersion specVersion,
boolean useOneofForNullables,
boolean failUnknownProperties,
ObjectMapper objectMapper,
SchemaRegistryClient client) throws IOException {

if (object == null) {
return null;
}

Class<?> cls = object.getClass();
//We only support the scenario of having the schema defined in the annotation in the java bean, since it does not rely on outdated libraries.
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(new Function<io.confluent.kafka.schemaregistry.annotations.SchemaReference, SchemaReference>() {
@Override
public SchemaReference apply(
io.confluent.kafka.schemaregistry.annotations.SchemaReference schemaReference) {
return new SchemaReference(schemaReference.name(), schemaReference.subject(),
schemaReference.version());
}
})
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(new Supplier<IOException>() {
@Override
public IOException get() {
return new IOException("Invalid schema " + schema.value()
+ " with refs " + references);
}
});
}
}
return null;
}
}

class ConfluentJsonSubstitutions {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
artifact: ${project.groupId}:${project.artifactId}:${project.version}
name: "Confluent Schema Registry - Json Schema"
metadata:
keywords:
- "confluent"
- "json-schema"
categories:
- "serialization"
status: "preview"
Loading
Loading