Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: standardize KSQL up-casting #3516

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,84 @@

package io.confluent.ksql.schema;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlException;
import java.util.function.BinaryOperator;

public enum Operator {
ADD("+"),
SUBTRACT("-"),
MULTIPLY("*"),
DIVIDE("/"),
MODULUS("%");
ADD("+", SqlDecimal::add) {
@Override
public SqlType resultType(final SqlType left, final SqlType right) {
if (left.baseType() == SqlBaseType.STRING && right.baseType() == SqlBaseType.STRING) {
return SqlTypes.STRING;
}

return super.resultType(left, right);
}
},
SUBTRACT("-", SqlDecimal::subtract),
MULTIPLY("*", SqlDecimal::multiply),
DIVIDE("/", SqlDecimal::divide),
MODULUS("%", SqlDecimal::modulus);

private final String symbol;
private final BinaryOperator<SqlDecimal> binaryResolver;

Operator(final String symbol) {
this.symbol = symbol;
Operator(final String symbol, final BinaryOperator<SqlDecimal> binaryResolver) {
this.symbol = requireNonNull(symbol, "symbol");
this.binaryResolver = requireNonNull(binaryResolver, "binaryResolver");
}

public String getSymbol() {
return symbol;
}

/**
* Determine the result type for the given parameters.
*
* @param left the left side of the operation.
* @param right the right side of the operation.
* @return the result schema.
*/
public SqlType resultType(final SqlType left, final SqlType right) {
if (left.baseType().isNumber() && right.baseType().isNumber()) {
if (left.baseType().canUpCast(right.baseType())) {
if (right.baseType() != SqlBaseType.DECIMAL) {
return right;
}

return binaryResolver.apply(toDecimal(left), (SqlDecimal) right);
}

if (right.baseType().canUpCast(left.baseType())) {
if (left.baseType() != SqlBaseType.DECIMAL) {
return left;
}

return binaryResolver.apply((SqlDecimal) left, toDecimal(right));
}
}

throw new KsqlException(
"Unsupported arithmetic types. " + left.baseType() + " " + right.baseType());
}

private static SqlDecimal toDecimal(final SqlType type) {
switch (type.baseType()) {
case DECIMAL:
return (SqlDecimal) type;
case INTEGER:
return SqlTypes.INT_UPCAST_TO_DECIMAL;
case BIGINT:
return SqlTypes.BIGINT_UPCAST_TO_DECIMAL;
default:
throw new KsqlException(
"Cannot convert " + type.baseType() + " to " + SqlBaseType.DECIMAL + ".");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@
* The SQL types supported by KSQL.
*/
public enum SqlBaseType {
BOOLEAN, INTEGER, BIGINT, DOUBLE, DECIMAL, STRING, ARRAY, MAP, STRUCT;
BOOLEAN, INTEGER, BIGINT, DECIMAL, DOUBLE, STRING, ARRAY, MAP, STRUCT;

/**
* @return {@code true} if numeric type.
*/
public boolean isNumber() {
// for now, conversions between DECIMAL and other numeric types is not supported
return this == INTEGER || this == BIGINT || this == DOUBLE;
return this == INTEGER || this == BIGINT || this == DECIMAL || this == DOUBLE;
}

/**
* Test to see if this type can be up-cast to another.
*
* <p>This defines if KSQL supports <i>implicitly</i> converting one numeric type to another.
*
* <p>Types can always be upcast to themselves. Only numeric types can be upcast to different
* numeric types. Note: STRING to DECIMAL handling is not seen as up-casting, it's parsing.
*
* @param to the target type.
* @return true if this type can be upcast to the supplied type.
*/
public boolean canUpCast(final SqlBaseType to) {
return isNumber() && to.isNumber() && this.ordinal() <= to.ordinal();
return this.equals(to)
|| (isNumber() && to.isNumber() && this.ordinal() <= to.ordinal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,75 @@ public String toString() {
public String toString(final FormatOptions formatOptions) {
return toString();
}

/**
* Determine the decimal type should two decimals be added together.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal add(final SqlDecimal left, final SqlDecimal right) {
final int precision = Math.max(left.scale, right.scale)
+ Math.max(left.precision - left.scale, right.precision - right.scale)
+ 1;

final int scale = Math.max(left.scale, right.scale);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we don't leave it up to BigDecimal to figure out the scale and precision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below your review.

return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal type should one decimal be subtracted from another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal subtract(final SqlDecimal left, final SqlDecimal right) {
return add(left, right);
}

/**
* Determine the decimal type should one decimal be multiplied by another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal multiply(final SqlDecimal left, final SqlDecimal right) {
final int precision = left.precision + right.precision + 1;
final int scale = left.scale + right.scale;
return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal type should one decimal be divided by another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal divide(final SqlDecimal left, final SqlDecimal right) {
final int precision = left.precision - left.scale + right.scale
+ Math.max(6, left.scale + right.precision + 1);

final int scale = Math.max(6, left.scale + right.precision + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we limit to 6 decimal places?
If we were dividing 1 by 3 and the result was a recurring fraction, wouldn't it be better to have as many dps as possible to give the best overall precision? 0.33333333333333333333333

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut feeling would be to leave all the scale and precision calculations up to BigDecimal and only readjust the scale/precision at the end of the calculation if too many trailing zeros aren't desired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand this code is trying to calculate the required scale and precision of a decimal for a particular operation given the scale and precision of the operands.
While that is possible for some operations (add, subtract and multiply) I don't think it's possible in general for all opeations, e.g. for divide as the required scale and precision for the most exact number depends on the actual numbers and not just the types of the numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below your review.

return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal result type when calculating the remainder of dividing one decimal by
* another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal modulus(final SqlDecimal left, final SqlDecimal right) {
final int precision = Math.min(left.precision - left.scale, right.precision - right.scale)
+ Math.max(left.scale, right.scale);

final int scale = Math.max(left.scale, right.scale);
return SqlDecimal.of(precision, scale);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,14 @@ public static SqlMap map(final SqlType valueType) {
public static SqlStruct.Builder struct() {
return SqlStruct.builder();
}

/**
* Schema of an INT up-cast to a DECIMAL
*/
public static final SqlDecimal INT_UPCAST_TO_DECIMAL = SqlDecimal.of(10, 0);

/**
* Schema of an BIGINT up-cast to a DECIMAL
*/
public static final SqlDecimal BIGINT_UPCAST_TO_DECIMAL = SqlDecimal.of(19, 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
Expand Down Expand Up @@ -154,28 +155,6 @@ public static BigDecimal ensureFit(final BigDecimal value, final Schema schema)
}
}

/**
* Converts a schema to a decimal schema with set precision/scale without losing
* scale or precision.
*
* @param schema the schema
* @return the decimal schema
* @throws KsqlException if the schema cannot safely be converted to decimal
*/
public static Schema toDecimal(final Schema schema) {
switch (schema.type()) {
case BYTES:
requireDecimal(schema);
return schema;
case INT32:
return builder(10, 0).build();
case INT64:
return builder(19, 0).build();
default:
throw new KsqlException("Cannot convert schema of type " + schema.type() + " to decimal.");
}
}

/**
* Converts a schema to a sql decimal with set precision/scale without losing
* scale or precision.
Expand All @@ -190,9 +169,9 @@ public static SqlDecimal toSqlDecimal(final Schema schema) {
requireDecimal(schema);
return SqlDecimal.of(precision(schema), scale(schema));
case INT32:
return SqlDecimal.of(10, 0);
return SqlTypes.INT_UPCAST_TO_DECIMAL;
case INT64:
return SqlDecimal.of(19, 0);
return SqlTypes.BIGINT_UPCAST_TO_DECIMAL;
default:
throw new KsqlException("Cannot convert schema of type " + schema.type() + " to decimal.");
}
Expand Down
Loading