Skip to content

Commit

Permalink
address jark's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Apr 6, 2021
1 parent 6344548 commit 99f1c41
Show file tree
Hide file tree
Showing 22 changed files with 238 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import java.util.stream.Stream;

import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType;
import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;

/** Default implementation of {@link SchemaResolver}. */
Expand Down Expand Up @@ -183,7 +183,7 @@ private List<WatermarkSpec> resolveWatermarkSpecs(

// validate time attribute
final String timeColumn = watermarkSpec.getColumnName();
validateTimeColumn(timeColumn, inputColumns);
final Column validatedTimeColumn = validateTimeColumn(timeColumn, inputColumns);

// resolve watermark expression
final ResolvedExpression watermarkExpression;
Expand All @@ -198,11 +198,23 @@ private List<WatermarkSpec> resolveWatermarkSpecs(
}
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());

if (!watermarkExpression
.getOutputDataType()
.getLogicalType()
.getTypeRoot()
.equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
throw new ValidationException(
String.format(
"The watermark output type %s is different with input time filed type %s.",
watermarkExpression.getOutputDataType(),
validatedTimeColumn.getDataType()));
}

return Collections.singletonList(
WatermarkSpec.of(watermarkSpec.getColumnName(), watermarkExpression));
}

private void validateTimeColumn(String columnName, List<Column> columns) {
private Column validateTimeColumn(String columnName, List<Column> columns) {
final Optional<Column> timeColumn =
columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
if (!timeColumn.isPresent()) {
Expand All @@ -213,7 +225,7 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
columns.stream().map(Column::getName).collect(Collectors.toList())));
}
final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
if (!supportedWatermarkType(timeFieldType) || getPrecision(timeFieldType) != 3) {
if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) {
throw new ValidationException(
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand All @@ -222,10 +234,11 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
throw new ValidationException(
"A watermark can not be defined for a processing-time attribute.");
}
return timeColumn.get();
}

private void validateWatermarkExpression(LogicalType watermarkType) {
if (!supportedWatermarkType(watermarkType) || getPrecision(watermarkType) != 3) {
if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) {
throw new ValidationException(
"Invalid data type of expression for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class SchemaResolutionTest {
private static final ResolvedExpression WATERMARK_RESOLVED =
new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL);

private static final String INVALID_WATERMARK_SQL =
"CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND";
private static final ResolvedExpression INVALID_WATERMARK_RESOLVED =
new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> INVALID_WATERMARK_SQL);

private static final String PROCTIME_SQL = "PROCTIME()";

private static final ResolvedExpression PROCTIME_RESOLVED =
Expand Down Expand Up @@ -206,6 +211,13 @@ public void testSchemaResolutionErrors() {
.build(),
"Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");

testError(
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", callSql(INVALID_WATERMARK_SQL))
.build(),
"The watermark output type TIMESTAMP_LTZ(3) is different with input time filed type TIMESTAMP(3).");

testError(
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
Expand Down Expand Up @@ -451,6 +463,8 @@ private static ResolvedExpression resolveSqlExpression(
return WATERMARK_RESOLVED_WITH_TS_LTZ;
case PROCTIME_SQL:
return PROCTIME_RESOLVED;
case INVALID_WATERMARK_SQL:
return INVALID_WATERMARK_RESOLVED;
default:
throw new UnsupportedOperationException("Unknown SQL expression.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.Field;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;

Expand Down Expand Up @@ -529,7 +529,7 @@ private static void validateColumnsAndWatermarkSpecs(
}
LogicalType watermarkOutputType =
watermark.getWatermarkExprOutputType().getLogicalType();
if (!supportedWatermarkType(watermarkOutputType)) {
if (!canBeTimeAttributeType(watermarkOutputType)) {
throw new ValidationException(
String.format(
"Watermark strategy %s must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type '%s'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import java.util.Optional;
import java.util.function.Predicate;

import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIMESTAMP;

/**
* Utilities for checking {@link LogicalType} and avoiding a lot of type casting and repetitive
* work.
Expand Down Expand Up @@ -111,7 +109,8 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil
}

public static boolean isTimeAttribute(LogicalType logicalType) {
return hasFamily(logicalType, TIMESTAMP)
return (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE))
&& logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR;
}

Expand All @@ -127,24 +126,13 @@ public static boolean isProctimeAttribute(LogicalType logicalType) {
}

public static boolean canBeTimeAttributeType(LogicalType logicalType) {
if (isProctimeAttribute(logicalType)
&& logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return true;
}
if (isRowtimeAttribute(logicalType)
&& (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
|| logicalType.getTypeRoot()
== LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
|| logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return true;
}
return false;
}

public static boolean supportedWatermarkType(LogicalType logicalType) {
return logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
|| logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
}

/**
* Checks if the given type is a composite type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite._
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
import org.apache.flink.table.planner.plan.utils.WindowUtil.groupingContainsWindowStartEnd
import org.apache.flink.table.types.logical.{LocalZonedTimestampType, TimestampType}
import org.apache.flink.table.types.logical.{LocalZonedTimestampType, TimestampKind, TimestampType}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.hint.RelHint
Expand Down Expand Up @@ -89,9 +89,17 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
val isNoLongerTimeIndicator : String => Boolean = fieldName =>
measures.get(fieldName).exists(r => !FlinkTypeFactory.isTimeIndicatorType(r.getType))

// decide the MATCH_ROWTIME() return type is TIMESTAMP or TIMESTAMP_LTZ, if it is TIMESTAMP_LTZ,
// we need to materialize the output type of LogicalMatch node to TIMESTAMP_LTZ too.
val isLtzRowtimeIndicator = measures.exists {
case (name, node) => FlinkTypeFactory.isTimestampLtzIndicatorType(node.getType) &&
FlinkTypeFactory.isRowtimeIndicatorType(node.getType)
}

// materialize all output types
val outputType = materializerUtils.getRowTypeWithoutIndicators(
matchRel.getRowType,
isLtzRowtimeIndicator,
isNoLongerTimeIndicator)

LogicalMatch.create(
Expand Down Expand Up @@ -330,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {

val head = inputTypes.head.getFieldList.map(_.getType)

val isValid = inputTypes.forall { t =>
inputTypes.forall { t =>
val fieldTypes = t.getFieldList.map(_.getType)

fieldTypes.zip(head).forall { case (l, r) =>
// check if time indicators match
if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
leftTime == rightTime
}
// one side is not an indicator
else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) {
false
}
// uninteresting types
else {
true
}
validateUnionPair(l, r)
}
}

setOp.copy(setOp.getTraitSet, inputs, setOp.all)
}

private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = {
val exceptionMsg =
s"Union fields with time attributes requires same types, but the types are %s and %s."
// check if time indicators match
val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
if (leftTime && leftTime) {
//rowtime must have same type
isTimestampLtzIndicatorType(l) == isTimestampLtzIndicatorType(r)
} else {
leftTime == rightTime
}
}
// one side is not an indicator
else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) {
false
}
// uninteresting types
else {
true
}

if (!isValid) {
throw new ValidationException(
"Union fields with time attributes have different types.")
String.format(exceptionMsg, l.toString, r.toString))
}

setOp.copy(setOp.getTraitSet, inputs, setOp.all)
isValid
}

private def gatherIndicesToMaterialize(aggregate: Aggregate, input: RelNode): Set[Int] = {
Expand Down Expand Up @@ -636,7 +656,7 @@ class RexTimeIndicatorMaterializer(
val resolvedRefType = input(inputRef.getIndex)
// input is a valid time indicator
if (isTimeIndicatorType(resolvedRefType)) {
inputRef
rexBuilder.makeInputRef(resolvedRefType, inputRef.getIndex)
}
// input has been materialized
else {
Expand Down Expand Up @@ -669,17 +689,7 @@ class RexTimeIndicatorMaterializer(
case FlinkSqlOperatorTable.SESSION_OLD |
FlinkSqlOperatorTable.HOP_OLD |
FlinkSqlOperatorTable.TUMBLE_OLD =>
updatedCall.getOperands.toList

// The type of FINAL(MATCH_ROWTIME) is inferred by first operand's type,
// but the initial type of MATCH_ROWTIME is TIMESTAMP(3) *ROWTIME*
// rewrite the return type of according to the final type of MATCH_ROWTIME
case FINAL if updatedCall.getOperands.size() == 1
&& isMatchTimeIndicator(updatedCall.getOperands.get(0)) =>
val rowtimeType = updatedCall.getOperands.get(0).getType
updatedCall.clone(
rowtimeType.asInstanceOf[TimeIndicatorRelDataType].originalType,
updatedCall.getOperands).getOperands.toList
updatedCall.getOperands.toList

case _ =>
updatedCall.getOperands.map { o =>
Expand Down Expand Up @@ -711,21 +721,23 @@ class RexTimeIndicatorMaterializer(

// All calls in MEASURES and DEFINE are wrapped with FINAL/RUNNING, therefore
// we should treat FINAL(MATCH_ROWTIME) and FINAL(MATCH_PROCTIME) as a time attribute
// extraction
// extraction. The type of FINAL(MATCH_ROWTIME) is inferred by first operand's type,
// the initial type of MATCH_ROWTIME is TIMESTAMP(3) *ROWTIME*, it may be rewrote,
// so we re
case FINAL if updatedCall.getOperands.size() == 1
&& isMatchTimeIndicator(updatedCall.getOperands.get(0)) =>
val rowtimeType = updatedCall.getOperands.get(0).getType
updatedCall.clone(
rowtimeType,
materializedOperands)
rexBuilder.makeCall(rowtimeType, updatedCall.getOperator, updatedCall.getOperands)

// MATCH_ROWTIME() is a no-args function, it can own two kind of return types based
// on the rowtime attribute type of its input, we rewrite the return type here
case FlinkSqlOperatorTable.MATCH_ROWTIME if isTimeIndicatorType(updatedCall.getType) =>
val rowtimeType = input.filter(isTimeIndicatorType).head

updatedCall.clone(
rexBuilder.makeCall(
rowtime(
updatedCall.getType.isNullable,
isTimestampLtzIndicatorType(rowtimeType)),
updatedCall.getOperator,
materializedOperands)

// do not modify window time attributes
Expand Down Expand Up @@ -796,17 +808,36 @@ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {

def getRowTypeWithoutIndicators(
relType: RelDataType,
isLtzRowtimeIndicator: Boolean,
shouldMaterialize: String => Boolean): RelDataType = {
val outputTypeBuilder = rexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
.builder()

relType.getFieldList.asScala.zipWithIndex.foreach { case (field, idx) =>
if (isTimeIndicatorType(field.getType) && shouldMaterialize(field.getName)) {
outputTypeBuilder.add(
field.getName,
timestamp(field.getType.isNullable, isTimestampLtzIndicatorType(field.getType)))
if (isTimeIndicatorType(field.getType)) {
val convertedTimeIndicatorFieldType = if (isLtzRowtimeIndicator) {
rexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
.createFieldTypeFromLogicalType(
new LocalZonedTimestampType(field.getType.isNullable, TimestampKind.ROWTIME, 3))
} else {
field.getType
}

if (shouldMaterialize(field.getName)) {
outputTypeBuilder.add(
field.getName,
timestamp(
convertedTimeIndicatorFieldType.isNullable,
isTimestampLtzIndicatorType(convertedTimeIndicatorFieldType)))
} else {
outputTypeBuilder.add(
field.getName,
convertedTimeIndicatorFieldType)
}
} else {
outputTypeBuilder.add(field.getName, field.getType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object WatermarkGeneratorCodeGenerator {
contextTerm: Option[String] = None): GeneratedWatermarkGenerator = {
// validation
val watermarkOutputType = FlinkTypeFactory.toLogicalType(watermarkExpr.getType)
if (! (watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||
if (!(watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||
watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
throw new CodeGenException(
"WatermarkGenerator only accepts output data type of TIMESTAMP or TIMESTAMP_LTZ," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(

if (snapshotTimeInputRef.getType.getSqlTypeName
!= rightTimeInputRef.get.getType.getSqlTypeName) {
throw new ValidationException("Event-Time Temporal Table Join requires same rowtime" +
" type in left table and versioned table.")
throw new ValidationException(
String.format("Event-Time Temporal Table Join requires same rowtime" +
" type in left table and versioned table, but the rowtime types are %s and %s.",
snapshotTimeInputRef.getType.toString, rightTimeInputRef.get.getType.toString))
}
// Deal primary key in TemporalJoinRewriteUniqueKeyRule
TemporalJoinUtil.makeInitialRowTimeTemporalTableJoinCondCall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class StreamPhysicalIntervalJoinRule
.get(windowBounds.get.getRightTimeIdx).getType
if (leftTimeAttributeType.getSqlTypeName != rightTimeAttributeType.getSqlTypeName) {
throw new ValidationException(
"Interval join with rowtime attribute has different rowtime types")
String.format("Interval join with rowtime attribute requires same rowtime types," +
" but the types are %s and %s.",
leftTimeAttributeType.toString, rightTimeAttributeType.toString))
}
true
} else {
Expand Down
Loading

0 comments on commit 99f1c41

Please sign in to comment.