Skip to content

Commit

Permalink
refactor: start of our own internal Struct class (#3308)
Browse files Browse the repository at this point in the history
* refactor: start of our own internal Struct class
  • Loading branch information
big-andy-coates authored Sep 8, 2019
1 parent 9cb991f commit 7a997a4
Show file tree
Hide file tree
Showing 20 changed files with 979 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.schema.ksql;

/**
* Issue with data.
*/
public class DataException extends RuntimeException {

public DataException(final String msg) {
super(msg);
}

public DataException(final String msg, final Throwable cause) {
super(msg, cause);
}
}
54 changes: 14 additions & 40 deletions ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -27,9 +26,7 @@
@Immutable
public final class Field {

private final Optional<String> source;
private final String fullName;
private final String name;
private final FieldName name;
private final SqlType type;

/**
Expand All @@ -38,7 +35,7 @@ public final class Field {
* @return the immutable field.
*/
public static Field of(final String name, final SqlType type) {
return new Field(Optional.empty(), name, type);
return new Field(FieldName.of(Optional.empty(), name), type);
}

/**
Expand All @@ -48,55 +45,39 @@ public static Field of(final String name, final SqlType type) {
* @return the immutable field.
*/
public static Field of(final String source, final String name, final SqlType type) {
return new Field(Optional.of(source), name, type);
return new Field(FieldName.of(Optional.of(source), name), type);
}

/**
* @param source the name of the source of the field.
* @param name the name of the field.
* @param type the type of the field.
* @return the immutable field.
*/
public static Field of(final Optional<String> source, final String name, final SqlType type) {
return new Field(source, name, type);
public static Field of(final FieldName name, final SqlType type) {
return new Field(name, type);
}

private Field(final Optional<String> source, final String name, final SqlType type) {
this.source = Objects.requireNonNull(source, "source");
private Field(final FieldName name, final SqlType type) {
this.name = Objects.requireNonNull(name, "name");
this.type = Objects.requireNonNull(type, "type");
this.fullName = source
.map(s -> SchemaUtil.buildAliasedFieldName(s, name))
.orElse(name);

if (!name.trim().equals(name)) {
throw new IllegalArgumentException("name is not trimmed: '" + name + "'");
}

if (name.isEmpty()) {
throw new IllegalArgumentException("name is empty");
}
}

/**
* @return the name of the source of the field, where known.
*/
public Optional<String> source() {
return source;
public FieldName fieldName() {
return name;
}

/**
* @return the fully qualified field name.
*/
public String fullName() {
return fullName;
return name.fullName();
}

/**
* @return the name of the field, without any source / alias.
*/
public String name() {
return name;
return name.name();
}

/**
Expand All @@ -113,7 +94,7 @@ public SqlType type() {
* @return the new field.
*/
public Field withSource(final String source) {
return new Field(Optional.of(source), name, type);
return new Field(name.withSource(source), type);
}

@Override
Expand All @@ -125,13 +106,13 @@ public boolean equals(final Object o) {
return false;
}
final Field field = (Field) o;
return Objects.equals(fullName, field.fullName)
return Objects.equals(name, field.name)
&& Objects.equals(type, field.type);
}

@Override
public int hashCode() {
return Objects.hash(fullName, type);
return Objects.hash(name, type);
}

@Override
Expand All @@ -140,13 +121,6 @@ public String toString() {
}

public String toString(final FormatOptions formatOptions) {
final Optional<String> base = source.map(val -> escape(val, formatOptions));
final String escaped = escape(name, formatOptions);
final String field = base.isPresent() ? base.get() + "." + escaped : escaped;
return field + " " + type.toString(formatOptions);
}

private static String escape(final String string, final FormatOptions formatOptions) {
return formatOptions.isReservedWord(string) ? "`" + string + "`" : string;
return name.toString(formatOptions) + " " + type.toString(formatOptions);
}
}
128 changes: 128 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FieldName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.schema.ksql;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Objects;
import java.util.Optional;

/**
* Immutable POJO for storing a {@link Field}'s name
*/
@Immutable
public final class FieldName {

private final Optional<String> source;
private final String name;
private final String fullName;

public static FieldName of(final String name) {
return of(Optional.empty(), name);
}

public static FieldName of(final String source, final String name) {
return of(Optional.of(source), name);
}

public static FieldName of(final Optional<String> source, final String name) {
return new FieldName(source, name);
}

private FieldName(final Optional<String> source, final String fullName) {
this.source = Objects.requireNonNull(source, "source");
this.name = Objects.requireNonNull(fullName, "name");
this.fullName = source
.map(s -> SchemaUtil.buildAliasedFieldName(s, name))
.orElse(name);

this.source.ifPresent(src -> {
if (!src.trim().equals(src)) {
throw new IllegalArgumentException("source is not trimmed: '" + src + "'");
}

if (src.isEmpty()) {
throw new IllegalArgumentException("source is empty");
}

});

if (!name.trim().equals(name)) {
throw new IllegalArgumentException("name is not trimmed: '" + name + "'");
}

if (name.isEmpty()) {
throw new IllegalArgumentException("name is empty");
}
}

/**
* @return the name of the source of the field, where known.
*/
public Optional<String> source() {
return source;
}

/**
* @return the name of the field.
*/
public String name() {
return name;
}

/**
* @return the fully qualified field name.
*/
public String fullName() {
return fullName;
}

public FieldName withSource(final String source) {
return new FieldName(Optional.of(source), name);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final FieldName fieldName = (FieldName) o;
return Objects.equals(fullName, fieldName.fullName);
}

@Override
public int hashCode() {
return Objects.hash(fullName);
}

@Override
public String toString() {
return toString(FormatOptions.none());
}

public String toString(final FormatOptions formatOptions) {
final Optional<String> base = source.map(val -> escape(val, formatOptions));
final String escaped = escape(name, formatOptions);
return base.map(s -> s + "." + escaped).orElse(escaped);
}

private static String escape(final String string, final FormatOptions formatOptions) {
return formatOptions.isReservedWord(string) ? "`" + string + "`" : string;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public LogicalSchema withoutAlias() {
* @return {@code true} is aliased, {@code false} otherwise.
*/
public boolean isAliased() {
return metaFields.get(0).source().isPresent();
return metaFields.get(0).fieldName().source().isPresent();
}

/**
Expand Down Expand Up @@ -411,7 +411,7 @@ private static List<Field> fromConnectSchema(final Schema schema) {
final String fieldName = SchemaUtil.getFieldNameWithNoAlias(field.name());
final SqlType fieldType = converter.toSqlType(field.schema());

builder.add(Field.of(source, fieldName, fieldType));
builder.add(Field.of(FieldName.of(source, fieldName), fieldType));
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import static java.util.Objects.requireNonNull;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.ksql.DataException;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;

@Immutable
public final class SqlArray extends SqlType {
Expand All @@ -45,6 +49,31 @@ public boolean supportsCast() {
return false;
}

@Override
public void validateValue(final Object value) {
if (value == null) {
return;
}

if (!(value instanceof List)) {
final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter()
.toSqlType(value.getClass());

throw new DataException("Expected ARRAY, got " + sqlBaseType);
}

final List<?> array = (List<?>) value;

IntStream.range(0, array.size()).forEach(idx -> {
try {
final Object element = array.get(idx);
itemType.validateValue(element);
} catch (final DataException e) {
throw new DataException("ARRAY element " + (idx + 1) + ": " + e.getMessage(), e);
}
});
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package io.confluent.ksql.schema.ksql.types;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.ksql.DataException;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.util.DecimalUtil;
import java.math.BigDecimal;
import java.util.Objects;

@Immutable
Expand Down Expand Up @@ -52,6 +55,29 @@ public boolean supportsCast() {
return true;
}

@Override
public void validateValue(final Object value) {
if (value == null) {
return;
}

if (!(value instanceof BigDecimal)) {
final SqlBaseType sqlBaseType = SchemaConverters.javaToSqlConverter()
.toSqlType(value.getClass());

throw new DataException("Expected DECIMAL, got " + sqlBaseType);
}

final BigDecimal decimal = (BigDecimal) value;
if (decimal.precision() != precision) {
throw new DataException("Expected " + this + ", got precision " + decimal.precision());
}

if (decimal.scale() != scale) {
throw new DataException("Expected " + this + ", got scale " + decimal.scale());
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit 7a997a4

Please sign in to comment.