diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java index 4b04151069ff..28a49f4742ef 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java @@ -29,6 +29,7 @@ import java.math.BigDecimal; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.function.Function; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -285,12 +286,11 @@ private static class JavaToSqlConverter implements JavaToSqlTypeConverter { @Override public SqlBaseType toSqlType(final Class javaType) { - final SqlBaseType sqlType = JAVA_TO_SQL.get(javaType); - if (sqlType == null) { - throw new KsqlException("Unexpected java type: " + javaType); - } - - return sqlType; + return JAVA_TO_SQL.entrySet().stream() + .filter(e -> e.getKey().isAssignableFrom(javaType)) + .map(Entry::getValue) + .findAny() + .orElseThrow(() -> new KsqlException("Unexpected java type: " + javaType)); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java index 5accd7214bec..9e200b84e0a5 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java @@ -21,6 +21,8 @@ import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.schema.ksql.types.SqlArray; import io.confluent.ksql.schema.ksql.types.SqlDecimal; @@ -31,6 +33,8 @@ import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -177,6 +181,26 @@ public void shouldGetSqlTypeForAllJavaTypes() { }); } + @Test + public void shouldGetSqArrayForImplementationsOfJavaList() { + ImmutableList.>of( + ArrayList.class, + ImmutableList.class + ).forEach(javaType -> { + assertThat(SchemaConverters.javaToSqlConverter().toSqlType(javaType), is(SqlBaseType.ARRAY)); + }); + } + + @Test + public void shouldGetSqlMapForImplementationsOfJavaMap() { + ImmutableList.>of( + HashMap.class, + ImmutableMap.class + ).forEach(javaType -> { + assertThat(SchemaConverters.javaToSqlConverter().toSqlType(javaType), is(SqlBaseType.MAP)); + }); + } + @Test public void shouldConvertNestedComplexToSql() { assertThat(SchemaConverters.connectToSqlConverter().toSqlType(NESTED_LOGICAL_TYPE), is(NESTED_SQL_TYPE));