Skip to content

Commit

Permalink
fix(json-converter): fix iceberg json converter
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and chakru-r committed Jan 30, 2025
1 parent 965633a commit a559a02
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 39 deletions.
14 changes: 12 additions & 2 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.cli.exists_cli import exists
from datahub.cli.get_cli import get
from datahub.cli.iceberg_cli import iceberg
from datahub.cli.ingest_cli import ingest
from datahub.cli.migrate import migrate
from datahub.cli.put_cli import put
Expand Down Expand Up @@ -183,7 +182,18 @@ def init(use_password: bool = False) -> None:
datahub.add_command(datacontract)
datahub.add_command(assertions)
datahub.add_command(container)
datahub.add_command(iceberg)

try:
from datahub.cli.iceberg_cli import iceberg

datahub.add_command(iceberg)
except ImportError as e:
logger.debug(f"Failed to load datahub iceberg command: {e}")
datahub.add_command(

Check warning on line 192 in metadata-ingestion/src/datahub/entrypoints.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/entrypoints.py#L190-L192

Added lines #L190 - L192 were not covered by tests
make_shim_command(
"iceberg", "run `pip install 'acryl-datahub[iceberg-catalog]'`"
)
)

try:
from datahub.cli.lite_cli import lite
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.datahubproject.iceberg.catalog.rest.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import javax.annotation.Nonnull;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;

public class IcebergJsonConverter extends MappingJackson2HttpMessageConverter {
private static final String ICEBERG_PACKAGE_PREFIX = "org.apache.iceberg.";

public IcebergJsonConverter(ObjectMapper objectMapper) {
super(objectMapper);
}

@Override
protected boolean supports(@Nonnull Class<?> clazz) {
return isClassInPackage(clazz);
}

@Override
public boolean canRead(@Nonnull Type type, Class<?> contextClass, MediaType mediaType) {
return hasTypeInPackage(type) && super.canRead(type, contextClass, mediaType);
}

@Override
public boolean canWrite(@Nonnull Class<?> clazz, MediaType mediaType) {
return isClassInPackage(clazz) && super.canWrite(clazz, mediaType);
}

private boolean hasTypeInPackage(Type type) {
if (type instanceof Class<?>) {
return isClassInPackage((Class<?>) type);
}

if (type instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) type;

// Check raw type
Type rawType = paramType.getRawType();
if (rawType instanceof Class<?> && isClassInPackage((Class<?>) rawType)) {
return true;
}

// Recursively check type arguments
for (Type typeArg : paramType.getActualTypeArguments()) {
if (hasTypeInPackage(typeArg)) {
return true;
}
}
}

if (type instanceof WildcardType) {
WildcardType wildcardType = (WildcardType) type;
// Check upper bounds
for (Type bound : wildcardType.getUpperBounds()) {
if (hasTypeInPackage(bound)) {
return true;
}
}
// Check lower bounds
for (Type bound : wildcardType.getLowerBounds()) {
if (hasTypeInPackage(bound)) {
return true;
}
}
}

if (type instanceof GenericArrayType) {
GenericArrayType arrayType = (GenericArrayType) type;
return hasTypeInPackage(arrayType.getGenericComponentType());
}

return false;
}

private static boolean isClassInPackage(@Nonnull Class<?> clazz) {
return clazz.getName().startsWith(ICEBERG_PACKAGE_PREFIX);
}
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
package io.datahubproject.iceberg.catalog.rest.common;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import io.datahubproject.iceberg.catalog.credentials.CachingCredentialProvider;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.iceberg.catalog.credentials.S3CredentialProvider;
import java.util.List;
import org.apache.iceberg.rest.RESTSerializers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.*;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class IcebergSpringWebConfig implements WebMvcConfigurer {
@Override
public void extendMessageConverters(List<HttpMessageConverter<?>> converters) {
for (HttpMessageConverter<?> converter : converters) {
if (converter instanceof MappingJackson2HttpMessageConverter jsonConverter) {
ObjectMapper objectMapper = jsonConverter.getObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy());
RESTSerializers.registerAll(objectMapper);
break;
}
}
}
public class IcebergSpringWebConfig {

@Bean
public CredentialProvider credentialProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.servers.Server;
import jakarta.servlet.http.HttpServletRequest;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.http.converter.ByteArrayHttpMessageConverter;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Slf4j
@EnableWebMvc
Expand All @@ -35,7 +28,7 @@
havingValue = InternalSchemaRegistryFactory.TYPE)
@Configuration
@ComponentScan(basePackages = {"io.datahubproject.openapi.schema.registry"})
public class SpringWebSchemaRegistryConfig implements WebMvcConfigurer {
public class SpringWebSchemaRegistryConfig {

@Bean
public SchemaRegistryController schemaRegistryController(
Expand All @@ -44,12 +37,4 @@ public SchemaRegistryController schemaRegistryController(
@Qualifier("schemaRegistryService") SchemaRegistryService schemaRegistryService) {
return new SchemaRegistryController(objectMapper, request, schemaRegistryService);
}

@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> messageConverters) {
messageConverters.add(new StringHttpMessageConverter());
messageConverters.add(new ByteArrayHttpMessageConverter());
messageConverters.add(new FormHttpMessageConverter());
messageConverters.add(new MappingJackson2HttpMessageConverter());
}
}
1 change: 1 addition & 0 deletions metadata-service/war/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation project(':metadata-service:openapi-analytics-servlet')
implementation project(':metadata-service:schema-registry-servlet')
implementation project(':metadata-service:iceberg-catalog')
implementation 'org.apache.iceberg:iceberg-core:1.6.1'
runtimeOnly project(':metadata-jobs:mce-consumer')
runtimeOnly project(':metadata-jobs:mae-consumer')
runtimeOnly project(':metadata-jobs:pe-consumer')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@
import com.datahub.gms.servlet.Config;
import com.datahub.gms.servlet.ConfigSearchExport;
import com.datahub.gms.servlet.HealthCheck;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.linkedin.r2.transport.http.server.RAPJakartaServlet;
import com.linkedin.restli.server.RestliHandlerServlet;
import io.datahubproject.iceberg.catalog.rest.common.IcebergJsonConverter;
import io.datahubproject.openapi.converter.StringToChangeCategoryConverter;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.rest.RESTSerializers;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
Expand Down Expand Up @@ -129,6 +134,7 @@ public void configureMessageConverters(List<HttpMessageConverter<?>> messageConv
messageConverters.add(new StringHttpMessageConverter());
messageConverters.add(new ByteArrayHttpMessageConverter());
messageConverters.add(new FormHttpMessageConverter());
messageConverters.add(createIcebergMessageConverter());

ObjectMapper objectMapper = new ObjectMapper();
int maxSize =
Expand All @@ -145,6 +151,16 @@ public void configureMessageConverters(List<HttpMessageConverter<?>> messageConv
messageConverters.add(jsonConverter);
}

private HttpMessageConverter<?> createIcebergMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
MappingJackson2HttpMessageConverter jsonConverter = new IcebergJsonConverter(objectMapper);

objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy());
RESTSerializers.registerAll(objectMapper);
return jsonConverter;
}

@Override
public void addFormatters(FormatterRegistry registry) {
registry.addConverter(new StringToChangeCategoryConverter());
Expand Down

0 comments on commit a559a02

Please sign in to comment.