diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/DataException.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/DataException.java new file mode 100644 index 000000000000..75d115e2d38f --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/DataException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.schema.ksql; + +/** + * Issue with data. + */ +public class DataException extends RuntimeException { + + public DataException(final String msg) { + super(msg); + } + + public DataException(final String msg, final Throwable cause) { + super(msg, cause); + } +} diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Field.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Field.java index eef79e5bffca..79174de21297 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Field.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Field.java @@ -17,7 +17,6 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.schema.ksql.types.SqlType; -import io.confluent.ksql.util.SchemaUtil; import java.util.Objects; import java.util.Optional; @@ -27,9 +26,7 @@ @Immutable public final class Field { - private final Optional source; - private final String fullName; - private final String name; + private final FieldName name; private final SqlType type; /** @@ -38,7 +35,7 @@ public final class Field { * @return the immutable field. */ public static Field of(final String name, final SqlType type) { - return new Field(Optional.empty(), name, type); + return new Field(FieldName.of(Optional.empty(), name), type); } /** @@ -48,55 +45,39 @@ public static Field of(final String name, final SqlType type) { * @return the immutable field. */ public static Field of(final String source, final String name, final SqlType type) { - return new Field(Optional.of(source), name, type); + return new Field(FieldName.of(Optional.of(source), name), type); } /** - * @param source the name of the source of the field. * @param name the name of the field. * @param type the type of the field. * @return the immutable field. */ - public static Field of(final Optional source, final String name, final SqlType type) { - return new Field(source, name, type); + public static Field of(final FieldName name, final SqlType type) { + return new Field(name, type); } - private Field(final Optional source, final String name, final SqlType type) { - this.source = Objects.requireNonNull(source, "source"); + private Field(final FieldName name, final SqlType type) { this.name = Objects.requireNonNull(name, "name"); this.type = Objects.requireNonNull(type, "type"); - this.fullName = source - .map(s -> SchemaUtil.buildAliasedFieldName(s, name)) - .orElse(name); - - if (!name.trim().equals(name)) { - throw new IllegalArgumentException("name is not trimmed: '" + name + "'"); - } - - if (name.isEmpty()) { - throw new IllegalArgumentException("name is empty"); - } } - /** - * @return the name of the source of the field, where known. - */ - public Optional source() { - return source; + public FieldName fieldName() { + return name; } /** * @return the fully qualified field name. */ public String fullName() { - return fullName; + return name.fullName(); } /** * @return the name of the field, without any source / alias. */ public String name() { - return name; + return name.name(); } /** @@ -113,7 +94,7 @@ public SqlType type() { * @return the new field. */ public Field withSource(final String source) { - return new Field(Optional.of(source), name, type); + return new Field(name.withSource(source), type); } @Override @@ -125,13 +106,13 @@ public boolean equals(final Object o) { return false; } final Field field = (Field) o; - return Objects.equals(fullName, field.fullName) + return Objects.equals(name, field.name) && Objects.equals(type, field.type); } @Override public int hashCode() { - return Objects.hash(fullName, type); + return Objects.hash(name, type); } @Override @@ -140,13 +121,6 @@ public String toString() { } public String toString(final FormatOptions formatOptions) { - final Optional base = source.map(val -> escape(val, formatOptions)); - final String escaped = escape(name, formatOptions); - final String field = base.isPresent() ? base.get() + "." + escaped : escaped; - return field + " " + type.toString(formatOptions); - } - - private static String escape(final String string, final FormatOptions formatOptions) { - return formatOptions.isReservedWord(string) ? "`" + string + "`" : string; + return name.toString(formatOptions) + " " + type.toString(formatOptions); } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FieldName.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FieldName.java new file mode 100644 index 000000000000..fcb1c685b45a --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FieldName.java @@ -0,0 +1,128 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.schema.ksql; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.util.SchemaUtil; +import java.util.Objects; +import java.util.Optional; + +/** + * Immutable POJO for storing a {@link Field}'s name + */ +@Immutable +public final class FieldName { + + private final Optional source; + private final String name; + private final String fullName; + + public static FieldName of(final String name) { + return of(Optional.empty(), name); + } + + public static FieldName of(final String source, final String name) { + return of(Optional.of(source), name); + } + + public static FieldName of(final Optional source, final String name) { + return new FieldName(source, name); + } + + private FieldName(final Optional source, final String fullName) { + this.source = Objects.requireNonNull(source, "source"); + this.name = Objects.requireNonNull(fullName, "name"); + this.fullName = source + .map(s -> SchemaUtil.buildAliasedFieldName(s, name)) + .orElse(name); + + this.source.ifPresent(src -> { + if (!src.trim().equals(src)) { + throw new IllegalArgumentException("source is not trimmed: '" + src + "'"); + } + + if (src.isEmpty()) { + throw new IllegalArgumentException("source is empty"); + } + + }); + + if (!name.trim().equals(name)) { + throw new IllegalArgumentException("name is not trimmed: '" + name + "'"); + } + + if (name.isEmpty()) { + throw new IllegalArgumentException("name is empty"); + } + } + + /** + * @return the name of the source of the field, where known. + */ + public Optional source() { + return source; + } + + /** + * @return the name of the field. + */ + public String name() { + return name; + } + + /** + * @return the fully qualified field name. + */ + public String fullName() { + return fullName; + } + + public FieldName withSource(final String source) { + return new FieldName(Optional.of(source), name); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final FieldName fieldName = (FieldName) o; + return Objects.equals(fullName, fieldName.fullName); + } + + @Override + public int hashCode() { + return Objects.hash(fullName); + } + + @Override + public String toString() { + return toString(FormatOptions.none()); + } + + public String toString(final FormatOptions formatOptions) { + final Optional base = source.map(val -> escape(val, formatOptions)); + final String escaped = escape(name, formatOptions); + return base.map(s -> s + "." + escaped).orElse(escaped); + } + + private static String escape(final String string, final FormatOptions formatOptions) { + return formatOptions.isReservedWord(string) ? "`" + string + "`" : string; + } +} diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index b695ff3c5f16..191d8407553d 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -229,7 +229,7 @@ public LogicalSchema withoutAlias() { * @return {@code true} is aliased, {@code false} otherwise. */ public boolean isAliased() { - return metaFields.get(0).source().isPresent(); + return metaFields.get(0).fieldName().source().isPresent(); } /** @@ -411,7 +411,7 @@ private static List fromConnectSchema(final Schema schema) { final String fieldName = SchemaUtil.getFieldNameWithNoAlias(field.name()); final SqlType fieldType = converter.toSqlType(field.schema()); - builder.add(Field.of(source, fieldName, fieldType)); + builder.add(Field.of(FieldName.of(source, fieldName), fieldType)); } return builder.build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java index 5d4e1bb0d2c8..768d19d9f844 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java @@ -18,9 +18,13 @@ import static java.util.Objects.requireNonNull; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SqlBaseType; +import java.util.List; import java.util.Objects; +import java.util.stream.IntStream; @Immutable public final class SqlArray extends SqlType { @@ -45,6 +49,31 @@ public boolean supportsCast() { return false; } + @Override + public void validateValue(final Object value) { + if (value == null) { + return; + } + + if (!(value instanceof List)) { + final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()); + + throw new DataException("Expected ARRAY, got " + sqlBaseType); + } + + final List array = (List) value; + + IntStream.range(0, array.size()).forEach(idx -> { + try { + final Object element = array.get(idx); + itemType.validateValue(element); + } catch (final DataException e) { + throw new DataException("ARRAY element " + (idx + 1) + ": " + e.getMessage(), e); + } + }); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java index 10ff739efc69..66c4b2b35246 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java @@ -16,9 +16,12 @@ package io.confluent.ksql.schema.ksql.types; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.DecimalUtil; +import java.math.BigDecimal; import java.util.Objects; @Immutable @@ -52,6 +55,29 @@ public boolean supportsCast() { return true; } + @Override + public void validateValue(final Object value) { + if (value == null) { + return; + } + + if (!(value instanceof BigDecimal)) { + final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()); + + throw new DataException("Expected DECIMAL, got " + sqlBaseType); + } + + final BigDecimal decimal = (BigDecimal) value; + if (decimal.precision() != precision) { + throw new DataException("Expected " + this + ", got precision " + decimal.precision()); + } + + if (decimal.scale() != scale) { + throw new DataException("Expected " + this + ", got scale " + decimal.scale()); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java index 5d7b5167a15b..69b63f7ecaff 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java @@ -18,13 +18,18 @@ import static java.util.Objects.requireNonNull; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SqlBaseType; +import java.util.Map; import java.util.Objects; @Immutable public final class SqlMap extends SqlType { + private static final SqlType KEY_TYPE = SqlTypes.STRING; + private final SqlType valueType; public static SqlMap of(final SqlType valueType) { @@ -45,6 +50,35 @@ public boolean supportsCast() { return false; } + @Override + public void validateValue(final Object value) { + if (value == null) { + return; + } + + if (!(value instanceof Map)) { + final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()); + + throw new DataException("Expected MAP, got " + sqlBaseType); + } + + final Map map = (Map) value; + map.forEach((k, v) -> { + try { + KEY_TYPE.validateValue(k); + } catch (final DataException e) { + throw new DataException("MAP key: " + e.getMessage(), e); + } + + try { + valueType.validateValue(v); + } catch (final DataException e) { + throw new DataException("MAP value for key '" + k + "': " + e.getMessage(), e); + } + }); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java index 8f45b2be6b95..2e1d3ebff254 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java @@ -18,7 +18,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.KsqlException; import java.util.Objects; @@ -82,6 +84,20 @@ public boolean supportsCast() { return true; } + @Override + public void validateValue(final Object value) { + if (value == null) { + return; + } + + final SqlBaseType actualType = SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()); + + if (!baseType().equals(actualType)) { + throw new DataException("Expected " + baseType() + ", got " + actualType); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java index 027b4e8faf02..ee48c8fe9d3f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java @@ -19,9 +19,12 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.Field; import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.types.KsqlStruct; import io.confluent.ksql.util.KsqlException; import java.util.ArrayList; import java.util.List; @@ -51,6 +54,25 @@ public boolean supportsCast() { return false; } + @Override + public void validateValue(final Object value) { + if (value == null) { + return; + } + + if (!(value instanceof KsqlStruct)) { + final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()); + + throw new DataException("Expected STRUCT, got " + sqlBaseType); + } + + final KsqlStruct struct = (KsqlStruct)value; + if (!struct.schema().equals(this)) { + throw new DataException("Expected " + this + ", got " + struct.schema()); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java index 63e9f67c5eea..c8e939c64c6f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java @@ -38,5 +38,7 @@ public SqlBaseType baseType() { public abstract boolean supportsCast(); + public abstract void validateValue(Object value); + public abstract String toString(FormatOptions formatOptions); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/types/KsqlStruct.java b/ksql-common/src/main/java/io/confluent/ksql/types/KsqlStruct.java new file mode 100644 index 000000000000..ecbe5bc984ec --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/types/KsqlStruct.java @@ -0,0 +1,134 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.types; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.schema.ksql.Field; +import io.confluent.ksql.schema.ksql.FieldName; +import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.util.KsqlException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Instance of {@link io.confluent.ksql.schema.ksql.types.SqlStruct}. + */ +@Immutable +public final class KsqlStruct { + + private final SqlStruct schema; + private final ImmutableList> values; + + public static Builder builder(final SqlStruct schema) { + return new Builder(schema); + } + + private KsqlStruct( + final SqlStruct schema, + final List> values + ) { + this.schema = Objects.requireNonNull(schema, "schema"); + this.values = ImmutableList.copyOf(Objects.requireNonNull(values, "values")); + } + + public SqlStruct schema() { + return this.schema; + } + + public List> values() { + return values; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KsqlStruct that = (KsqlStruct) o; + return Objects.equals(schema, that.schema) + && Objects.equals(values, that.values); + } + + @Override + public int hashCode() { + return Objects.hash(schema, values); + } + + @Override + public String toString() { + return "KsqlStruct{" + + "values=" + values + + ", schema=" + schema + + '}'; + } + + private static FieldInfo getField(final FieldName name, final SqlStruct schema) { + final List fields = schema.getFields(); + + for (int idx = 0; idx < fields.size(); idx++) { + final Field field = fields.get(idx); + if (field.fieldName().equals(name)) { + return new FieldInfo(idx, field); + } + } + + throw new KsqlException("Unknown field: " + name); + } + + public static final class Builder { + + private final SqlStruct schema; + private final List> values; + + public Builder(final SqlStruct schema) { + this.schema = Objects.requireNonNull(schema, "schema"); + this.values = new ArrayList<>(schema.getFields().size()); + schema.getFields().forEach(f -> values.add(Optional.empty())); + } + + public Builder set(final FieldName field, final Optional value) { + final FieldInfo info = getField(field, schema); + info.field.type().validateValue(value.orElse(null)); + values.set(info.index, value); + return this; + } + + public Builder set(final String field, final Object value) { + return set(FieldName.of(field), Optional.ofNullable(value)); + } + + public KsqlStruct build() { + return new KsqlStruct(schema, values); + } + } + + private static final class FieldInfo { + + final int index; + final Field field; + + private FieldInfo(final int index, final Field field) { + this.index = index; + this.field = Objects.requireNonNull(field, "field"); + } + } +} diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldNameTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldNameTest.java new file mode 100644 index 000000000000..9de7c06803e1 --- /dev/null +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldNameTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.schema.ksql; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.google.common.testing.EqualsTester; +import com.google.common.testing.NullPointerTester; +import java.util.Optional; +import org.junit.Before; +import org.junit.Test; + +public class FieldNameTest { + + private FieldName fieldName; + private FieldName fieldNameNoSource; + + @Before + public void setUp() { + fieldName = FieldName.of(Optional.of("someSource"), "someName"); + fieldNameNoSource = FieldName.of(Optional.empty(), "someName"); + } + + @Test + public void shouldThrowNPE() { + new NullPointerTester() + .testAllPublicStaticMethods(FieldName.class); + } + + @Test + public void shouldImplementEqualsProperly() { + new EqualsTester() + .addEqualityGroup( + FieldName.of(Optional.of("someSource"), "someName"), + FieldName.of(Optional.of("someSource"), "someName") + ) + .addEqualityGroup( + FieldName.of(Optional.empty(), "someName".toUpperCase()) + ) + .testEquals(); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnLeadingWhiteSpaceOfSource() { + FieldName.of(Optional.of(" source_with_leading_ws"), "name"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnLeadingWhiteSpaceOfNameWhenNoSource() { + FieldName.of(Optional.empty(), " name_with_leading_ws"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnLeadingWhiteSpaceOfNameWhenSource() { + FieldName.of(Optional.of("source"), " name_with_leading_ws"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnTrailingWhiteSpaceOfSource() { + FieldName.of(Optional.of("source_with_leading_ws "), "name"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnTrailingWhiteSpaceOfNameWhenNoSource() { + FieldName.of(Optional.empty(), "name_with_leading_ws "); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnTrailingWhiteSpaceOfNameWhenSource() { + FieldName.of(Optional.of("source"), "name_with_leading_ws "); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnEmptySource() { + FieldName.of(Optional.of(""), "name"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnEmptyName() { + FieldName.of(Optional.empty(), ""); + } + + @Test + public void shouldReturnSource() { + assertThat(fieldName.source(), is(Optional.of("someSource"))); + } + + @Test + public void shouldReturnName() { + assertThat(fieldName.name(), is("someName")); + } + + @Test + public void shouldReturnFullName() { + assertThat(fieldName.fullName(), is("someSource.someName")); + } + + @Test + public void shouldReturnFullNameNoSource() { + assertThat(fieldNameNoSource.fullName(), is("someName")); + } + + @Test + public void shouldToString() { + assertThat(fieldName.toString(), is("`someSource`.`someName`")); + assertThat(fieldNameNoSource.toString(), is("`someName`")); + } + + @Test + public void shouldToStringWithReservedWords() { + // Given: + final FormatOptions options = FormatOptions.of( + identifier -> identifier.equals("reserved") + || identifier.equals("word") + || identifier.equals("reserved.name") + ); + + // Then: + assertThat(FieldName.of("not-reserved").toString(options), + is("not-reserved")); + + assertThat(FieldName.of("reserved").toString(options), + is("`reserved`")); + + assertThat(FieldName.of("reserved", "word").toString(options), + is("`reserved`.`word`")); + + assertThat(FieldName.of("source", "word").toString(options), + is("source.`word`")); + } +} \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldTest.java index 5f4f8c263bf9..3907e0afb804 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/FieldTest.java @@ -20,7 +20,6 @@ import com.google.common.testing.EqualsTester; import com.google.common.testing.NullPointerTester; -import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.Optional; @@ -32,24 +31,11 @@ public class FieldTest { public void shouldThrowNPE() { new NullPointerTester() .setDefault(SqlType.class, SqlTypes.BIGINT) + .setDefault(String.class, "field0") + .setDefault(FieldName.class, FieldName.of(Optional.empty(), "fred")) .testAllPublicStaticMethods(Field.class); } - @Test(expected = IllegalArgumentException.class) - public void shouldThrowOnLeadingWhiteSpace() { - Field.of(" name_with_leading_white_space", SqlTypes.STRING); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowOnTrailingWhiteSpace() { - Field.of("name_with_leading_white_space ", SqlTypes.STRING); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowOnEmptyName() { - Field.of("", SqlTypes.STRING); - } - @Test public void shouldImplementEqualsProperly() { new EqualsTester() @@ -73,6 +59,15 @@ public void shouldImplementEqualsProperly() { .testEquals(); } + @Test + public void shouldReturnFieldName() { + assertThat(Field.of("SomeName", SqlTypes.BOOLEAN).fieldName(), + is(FieldName.of(Optional.empty(), "SomeName"))); + + assertThat(Field.of("SomeSource", "SomeName", SqlTypes.BOOLEAN).fieldName(), + is(FieldName.of(Optional.of("SomeSource"), "SomeName"))); + } + @Test public void shouldReturnName() { assertThat(Field.of("SomeName", SqlTypes.BOOLEAN).name(), @@ -91,15 +86,6 @@ public void shouldReturnFullName() { is("SomeSource.SomeName")); } - @Test - public void shouldReturnSource() { - assertThat(Field.of("SomeName", SqlTypes.BOOLEAN).source(), - is(Optional.empty())); - - assertThat(Field.of("SomeSource", "SomeName", SqlTypes.BOOLEAN).source(), - is(Optional.of("SomeSource"))); - } - @Test public void shouldReturnType() { assertThat(Field.of("SomeName", SqlTypes.BOOLEAN).type(), is(SqlTypes.BOOLEAN)); @@ -135,13 +121,5 @@ public void shouldToStringWithReservedWords() { assertThat(Field.of("source", "word", SqlTypes.STRING).toString(options), is("source.`word` STRING")); - - final SqlStruct struct = SqlTypes.struct() - .field("reserved", SqlTypes.BIGINT) - .field("other", SqlTypes.BIGINT) - .build(); - - assertThat(Field.of("reserved", "name", struct).toString(options), - is("`reserved`.name STRUCT<`reserved` BIGINT, other BIGINT>")); } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlArrayTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlArrayTest.java index 2c3362ae614d..99d7ca0dfa92 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlArrayTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlArrayTest.java @@ -18,14 +18,22 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import com.google.common.collect.ImmutableList; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.SqlBaseType; +import java.util.Arrays; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class SqlArrayTest { private static final SqlType SOME_TYPE = SqlPrimitiveType.of(SqlBaseType.STRING); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Test public void shouldImplementHashCodeAndEqualsProperly() { new EqualsTester() @@ -53,4 +61,48 @@ public void shouldImplmentToString() { + ">" )); } + + @Test + public void shouldThrowIfNotValueList() { + // Given: + final SqlArray schema = SqlTypes.array(SqlTypes.BIGINT); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected ARRAY, got BIGINT"); + + // When: + schema.validateValue(10L); + } + + @Test + public void shouldThrowIfAnyElementInValueNotElementType() { + // Given: + final SqlArray schema = SqlTypes.array(SqlTypes.BIGINT); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("ARRAY element 2: Expected BIGINT, got INT"); + + // When: + schema.validateValue(ImmutableList.of(11L, 9)); + } + + @Test + public void shouldNotThrowWhenValidatingNullValue() { + // Given: + final SqlArray schema = SqlTypes.array(SqlTypes.BIGINT); + + // When: + schema.validateValue(null); + } + + @Test + public void shouldValidateValue() { + // Given: + final SqlArray schema = SqlTypes.array(SqlTypes.BIGINT); + + // When: + schema.validateValue(Arrays.asList(19L, null)); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlDecimalTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlDecimalTest.java index 8ca3e9ffa6d7..12f7c0f55c55 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlDecimalTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlDecimalTest.java @@ -19,14 +19,21 @@ import static org.hamcrest.Matchers.is; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.KsqlException; +import java.math.BigDecimal; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class SqlDecimalTest { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Test public void shouldImplementHashCodeAndEqualsProperly() { new EqualsTester() @@ -70,4 +77,61 @@ public void shouldThrowIfScaleGreaterThanPrecision() { public void shouldImplementToString() { assertThat(SqlDecimal.of(10, 2).toString(), is("DECIMAL(10, 2)")); } + + @Test + public void shouldThrowIfValueNotDecimal() { + // Given: + final SqlDecimal schema = SqlTypes.decimal(4, 1); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected DECIMAL, got BIGINT"); + + // When: + schema.validateValue(10L); + } + + @Test + public void shouldThrowIfValueHasWrongPrecision() { + // Given: + final SqlDecimal schema = SqlTypes.decimal(4, 1); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected DECIMAL(4, 1), got precision 5"); + + // When: + schema.validateValue(new BigDecimal("1234.5")); + } + + @Test + public void shouldThrowIfValueHasWrongScale() { + // Given: + final SqlDecimal schema = SqlTypes.decimal(4, 1); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected DECIMAL(4, 1), got scale 2"); + + // When: + schema.validateValue(new BigDecimal("12.50")); + } + + @Test + public void shouldNotThrowWhenValidatingNullValue() { + // Given: + final SqlDecimal schema = SqlTypes.decimal(4, 1); + + // When: + schema.validateValue(null); + } + + @Test + public void shouldValidateValue() { + // Given: + final SqlDecimal schema = SqlTypes.decimal(4, 1); + + // When: + schema.validateValue(new BigDecimal("123.0")); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlMapTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlMapTest.java index 41f9df584fac..e3bb722c182a 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlMapTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlMapTest.java @@ -18,14 +18,23 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.SqlBaseType; +import java.util.HashMap; +import java.util.Map; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class SqlMapTest { private static final SqlType SOME_TYPE = SqlPrimitiveType.of(SqlBaseType.DOUBLE); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Test public void shouldImplementHashCodeAndEqualsProperly() { new EqualsTester() @@ -53,4 +62,65 @@ public void shouldImplementToString() { + ">" )); } + + @Test + public void shouldThrowIfValueNotMap() { + // Given: + final SqlMap schema = SqlTypes.map(SqlTypes.BIGINT); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected MAP, got BIGINT"); + + // When: + schema.validateValue(10L); + } + + @Test + public void shouldThrowIfAnyKeyInValueNotKeyType() { + // Given: + final SqlMap schema = SqlTypes.map(SqlTypes.BIGINT); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("MAP key: Expected STRING, got INT"); + + // When: + schema.validateValue(ImmutableMap.of("first", 9L, 2, 9L)); + } + + @Test + public void shouldThrowIfAnyValueInValueNotValueType() { + // Given: + final SqlMap schema = SqlTypes.map(SqlTypes.BIGINT); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("MAP value for key '2': Expected BIGINT, got INT"); + + // When: + schema.validateValue(ImmutableMap.of("1", 11L, "2", 9)); + } + + @Test + public void shouldNotThrowWhenValidatingNullValue() { + // Given: + final SqlMap schema = SqlTypes.map(SqlTypes.BIGINT); + + // When: + schema.validateValue(null); + } + + @Test + public void shouldValidateValue() { + // Given: + final SqlMap schema = SqlTypes.map(SqlTypes.BIGINT); + final Map mapWithNull = new HashMap<>(); + mapWithNull.put("valid", 44L); + mapWithNull.put(null, 44L); + mapWithNull.put("v", null); + + // When: + schema.validateValue(mapWithNull); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveTypeTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveTypeTest.java index 35f7165332d6..cdd73cbc1c8a 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveTypeTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveTypeTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.KsqlException; import java.util.List; @@ -174,4 +175,37 @@ public void shouldImplementToString() { assertThat(SqlPrimitiveType.of(type).toString(), is(type.toString())); }); } + + @Test + public void shoudlValidatePrimitiveTypes() { + SqlPrimitiveType.of(SqlBaseType.BOOLEAN).validateValue(true); + SqlPrimitiveType.of(SqlBaseType.INTEGER).validateValue(19); + SqlPrimitiveType.of(SqlBaseType.BIGINT).validateValue(33L); + SqlPrimitiveType.of(SqlBaseType.DOUBLE).validateValue(45.0D); + SqlPrimitiveType.of(SqlBaseType.STRING).validateValue(""); + } + + @SuppressWarnings("UnnecessaryBoxing") + @Test + public void shouldValidateBoxedTypes() { + SqlPrimitiveType.of(SqlBaseType.BOOLEAN).validateValue(Boolean.FALSE); + SqlPrimitiveType.of(SqlBaseType.INTEGER).validateValue(Integer.valueOf(19)); + SqlPrimitiveType.of(SqlBaseType.BIGINT).validateValue(Long.valueOf(33L)); + SqlPrimitiveType.of(SqlBaseType.DOUBLE).validateValue(Double.valueOf(45.0D)); + } + + @Test + public void shouldValidateNullValue() { + SqlPrimitiveType.of(SqlBaseType.BOOLEAN).validateValue(null); + } + + @Test + public void shouldFailValidationForWrongType() { + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected BOOLEAN, got INT"); + + // When: + SqlPrimitiveType.of(SqlBaseType.BOOLEAN).validateValue(10); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlStructTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlStructTest.java index 2545e7854313..0a3617f8d01f 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlStructTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/types/SqlStructTest.java @@ -20,9 +20,11 @@ import static org.hamcrest.Matchers.is; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.schema.ksql.DataException; import io.confluent.ksql.schema.ksql.Field; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.types.KsqlStruct; import io.confluent.ksql.util.KsqlException; import org.junit.Rule; import org.junit.Test; @@ -140,4 +142,68 @@ public void shouldImplementToStringWithReservedWordHandling() { + ">" )); } + + @Test + public void shouldThrowIfValueNotStruct() { + // Given: + final SqlStruct schema = SqlTypes.struct() + .field("f0", SqlTypes.BIGINT) + .build(); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected STRUCT, got BIGINT"); + + // When: + schema.validateValue(10L); + } + + @Test + public void shouldThrowIfValueHasSchema() { + // Given: + final SqlStruct schema = SqlTypes.struct() + .field("f0", SqlTypes.BIGINT) + .build(); + + final SqlStruct mismatching = SqlTypes.struct() + .field("f0", SqlTypes.DOUBLE) + .build(); + + final KsqlStruct value = KsqlStruct.builder(mismatching) + .set("f0", 10.0D) + .build(); + + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected STRUCT<`f0` BIGINT>, got STRUCT<`f0` DOUBLE>"); + + // When: + schema.validateValue(value); + } + + @Test + public void shouldNotThrowWhenValidatingNullValue() { + // Given: + final SqlStruct schema = SqlTypes.struct() + .field("f0", SqlTypes.BIGINT) + .build(); + + // When: + schema.validateValue(null); + } + + @Test + public void shouldValidateValue() { + // Given: + final SqlStruct schema = SqlTypes.struct() + .field("f0", SqlTypes.BIGINT) + .build(); + + final KsqlStruct value = KsqlStruct.builder(schema) + .set("f0", 10L) + .build(); + + // When: + schema.validateValue(value); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/types/KsqlStructTest.java b/ksql-common/src/test/java/io/confluent/ksql/types/KsqlStructTest.java new file mode 100644 index 000000000000..6e8f639e5774 --- /dev/null +++ b/ksql-common/src/test/java/io/confluent/ksql/types/KsqlStructTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.types; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; + +import io.confluent.ksql.schema.ksql.DataException; +import io.confluent.ksql.schema.ksql.Field; +import io.confluent.ksql.schema.ksql.FieldName; +import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class KsqlStructTest { + + private static final SqlStruct SCHEMA = SqlTypes.struct() + .field("f0", SqlTypes.BIGINT) + .field(Field.of(FieldName.of("s1", "v1"), SqlTypes.BOOLEAN)) + .build(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldHandleExplicitNulls() { + // When: + final KsqlStruct struct = KsqlStruct.builder(SCHEMA) + .set(FieldName.of("f0"), Optional.empty()) + .set(FieldName.of("s1", "v1"), Optional.empty()) + .build(); + + // Then: + assertThat(struct.values(), contains(Optional.empty(), Optional.empty())); + } + + @Test + public void shouldHandleImplicitNulls() { + // When: + final KsqlStruct struct = KsqlStruct.builder(SCHEMA) + .build(); + + // Then: + assertThat(struct.values(), contains(Optional.empty(), Optional.empty())); + } + + @Test + public void shouldThrowFieldNotKnown() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Unknown field: `??`"); + + // When: + KsqlStruct.builder(SCHEMA) + .set("??", Optional.empty()); + } + + @Test + public void shouldThrowIfValueWrongType() { + // Then: + expectedException.expect(DataException.class); + expectedException.expectMessage("Expected BIGINT, got STRING"); + + // When: + KsqlStruct.builder(SCHEMA) + .set("f0", "field is BIGINT, so won't like this"); + } + + @Test + public void shouldBuildStruct() { + // When: + final KsqlStruct struct = KsqlStruct.builder(SCHEMA) + .set(FieldName.of("f0"), Optional.of(10L)) + .set(FieldName.of("s1", "v1"), Optional.of(true)) + .build(); + + // Then: + assertThat(struct.values(), contains(Optional.of(10L), Optional.of(true))); + } +} \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 899b226f4e3e..6daa0648040e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -46,6 +46,7 @@ import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KeyField.LegacyField; import io.confluent.ksql.schema.ksql.Field; +import io.confluent.ksql.schema.ksql.FieldName; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -422,8 +423,9 @@ private Optional findNewKeyField(final List selectExpr final String fullFieldName = getKeyField().name().get(); final Optional fieldAlias = SchemaUtil.getFieldNameAlias(fullFieldName); final String fieldNameWithNoAlias = SchemaUtil.getFieldNameWithNoAlias(fullFieldName); + final FieldName fieldName = FieldName.of(fieldAlias, fieldNameWithNoAlias); - final Field keyField = Field.of(fieldAlias, fieldNameWithNoAlias, SqlTypes.STRING); + final Field keyField = Field.of(fieldName, SqlTypes.STRING); return doFindKeyField(selectExpressions, keyField) .map(Field::fullName);