From 99f1c41eb70ddeff355829f59ec0649c6da64b0b Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Tue, 6 Apr 2021 20:23:47 +0800 Subject: [PATCH] address jark's comments --- .../table/catalog/DefaultSchemaResolver.java | 23 ++- .../table/catalog/SchemaResolutionTest.java | 14 ++ .../apache/flink/table/api/TableSchema.java | 4 +- .../logical/utils/LogicalTypeChecks.java | 20 +-- .../calcite/RelTimeIndicatorConverter.scala | 113 +++++++++----- .../WatermarkGeneratorCodeGenerator.scala | 2 +- ...CorrelateToJoinFromTemporalTableRule.scala | 6 +- .../StreamPhysicalIntervalJoinRule.scala | 4 +- .../planner/plan/stream/sql/UnionTest.scala | 8 +- .../stream/sql/join/IntervalJoinTest.scala | 3 +- .../sql/join/TemporalFunctionJoinTest.scala | 2 - .../stream/sql/join/TemporalJoinTest.scala | 4 +- .../stream/sql/GroupWindowITCase.scala | 3 +- .../stream/sql/MatchRecognizeITCase.scala | 16 +- .../stream/sql/SourceWatermarkITCase.scala | 6 +- .../stream/sql/TimeAttributeITCase.scala | 146 +++++++++--------- .../window/WindowRankOperatorBuilder.java | 3 +- .../window/combines/TopNRecordsCombiner.java | 14 +- .../processors/WindowRankProcessor.java | 9 +- .../combines/WindowCombineFunction.java | 4 +- .../runtime/typeutils/TypeCheckUtils.java | 4 +- .../table/runtime/util/TimeWindowUtil.java | 2 + 22 files changed, 238 insertions(+), 172 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index 2b96c1eaa66e79..84f27c462926d6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -53,9 +53,9 @@ import java.util.stream.Stream; import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType; import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType; /** Default implementation of {@link SchemaResolver}. */ @@ -183,7 +183,7 @@ private List resolveWatermarkSpecs( // validate time attribute final String timeColumn = watermarkSpec.getColumnName(); - validateTimeColumn(timeColumn, inputColumns); + final Column validatedTimeColumn = validateTimeColumn(timeColumn, inputColumns); // resolve watermark expression final ResolvedExpression watermarkExpression; @@ -198,11 +198,23 @@ private List resolveWatermarkSpecs( } validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); + if (!watermarkExpression + .getOutputDataType() + .getLogicalType() + .getTypeRoot() + .equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { + throw new ValidationException( + String.format( + "The watermark output type %s is different with input time filed type %s.", + watermarkExpression.getOutputDataType(), + validatedTimeColumn.getDataType())); + } + return Collections.singletonList( WatermarkSpec.of(watermarkSpec.getColumnName(), watermarkExpression)); } - private void validateTimeColumn(String columnName, List columns) { + private Column validateTimeColumn(String columnName, List columns) { final Optional timeColumn = columns.stream().filter(c -> c.getName().equals(columnName)).findFirst(); if (!timeColumn.isPresent()) { @@ -213,7 +225,7 @@ private void validateTimeColumn(String columnName, List columns) { columns.stream().map(Column::getName).collect(Collectors.toList()))); } final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType(); - if (!supportedWatermarkType(timeFieldType) || getPrecision(timeFieldType) != 3) { + if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) { throw new ValidationException( "Invalid data type of time field for watermark definition. " + "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); @@ -222,10 +234,11 @@ private void validateTimeColumn(String columnName, List columns) { throw new ValidationException( "A watermark can not be defined for a processing-time attribute."); } + return timeColumn.get(); } private void validateWatermarkExpression(LogicalType watermarkType) { - if (!supportedWatermarkType(watermarkType) || getPrecision(watermarkType) != 3) { + if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) { throw new ValidationException( "Invalid data type of expression for watermark definition. " + "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 0563bb61ead478..205fdf3f10399a 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -60,6 +60,11 @@ public class SchemaResolutionTest { private static final ResolvedExpression WATERMARK_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL); + private static final String INVALID_WATERMARK_SQL = + "CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND"; + private static final ResolvedExpression INVALID_WATERMARK_RESOLVED = + new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> INVALID_WATERMARK_SQL); + private static final String PROCTIME_SQL = "PROCTIME()"; private static final ResolvedExpression PROCTIME_RESOLVED = @@ -206,6 +211,13 @@ public void testSchemaResolutionErrors() { .build(), "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'."); + testError( + Schema.newBuilder() + .column("ts", DataTypes.TIMESTAMP(3)) + .watermark("ts", callSql(INVALID_WATERMARK_SQL)) + .build(), + "The watermark output type TIMESTAMP_LTZ(3) is different with input time filed type TIMESTAMP(3)."); + testError( Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP(3)) @@ -451,6 +463,8 @@ private static ResolvedExpression resolveSqlExpression( return WATERMARK_RESOLVED_WITH_TS_LTZ; case PROCTIME_SQL: return PROCTIME_RESOLVED; + case INVALID_WATERMARK_SQL: + return INVALID_WATERMARK_RESOLVED; default: throw new UnsupportedOperationException("Unknown SQL expression."); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java index b1fd24b82b3474..e5bbc3e4d787fb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java @@ -55,8 +55,8 @@ import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.Field; import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType; import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; @@ -529,7 +529,7 @@ private static void validateColumnsAndWatermarkSpecs( } LogicalType watermarkOutputType = watermark.getWatermarkExprOutputType().getLogicalType(); - if (!supportedWatermarkType(watermarkOutputType)) { + if (!canBeTimeAttributeType(watermarkOutputType)) { throw new ValidationException( String.format( "Watermark strategy %s must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type '%s'.", diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java index c2d3a2b1df1826..0c6506dec9ce44 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java @@ -54,8 +54,6 @@ import java.util.Optional; import java.util.function.Predicate; -import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIMESTAMP; - /** * Utilities for checking {@link LogicalType} and avoiding a lot of type casting and repetitive * work. @@ -111,7 +109,8 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil } public static boolean isTimeAttribute(LogicalType logicalType) { - return hasFamily(logicalType, TIMESTAMP) + return (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + || hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) && logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR; } @@ -127,24 +126,13 @@ public static boolean isProctimeAttribute(LogicalType logicalType) { } public static boolean canBeTimeAttributeType(LogicalType logicalType) { - if (isProctimeAttribute(logicalType) - && logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { - return true; - } - if (isRowtimeAttribute(logicalType) - && (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE - || logicalType.getTypeRoot() - == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE + || logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { return true; } return false; } - public static boolean supportedWatermarkType(LogicalType logicalType) { - return logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE - || logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; - } - /** * Checks if the given type is a composite type. * diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala index fad18a7917e60c..54cdfd67d855cd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite._ import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil import org.apache.flink.table.planner.plan.utils.WindowUtil.groupingContainsWindowStartEnd -import org.apache.flink.table.types.logical.{LocalZonedTimestampType, TimestampType} +import org.apache.flink.table.types.logical.{LocalZonedTimestampType, TimestampKind, TimestampType} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core._ import org.apache.calcite.rel.hint.RelHint @@ -89,9 +89,17 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val isNoLongerTimeIndicator : String => Boolean = fieldName => measures.get(fieldName).exists(r => !FlinkTypeFactory.isTimeIndicatorType(r.getType)) + // decide the MATCH_ROWTIME() return type is TIMESTAMP or TIMESTAMP_LTZ, if it is TIMESTAMP_LTZ, + // we need to materialize the output type of LogicalMatch node to TIMESTAMP_LTZ too. + val isLtzRowtimeIndicator = measures.exists { + case (name, node) => FlinkTypeFactory.isTimestampLtzIndicatorType(node.getType) && + FlinkTypeFactory.isRowtimeIndicatorType(node.getType) + } + // materialize all output types val outputType = materializerUtils.getRowTypeWithoutIndicators( matchRel.getRowType, + isLtzRowtimeIndicator, isNoLongerTimeIndicator) LogicalMatch.create( @@ -330,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => val fieldTypes = t.getFieldList.map(_.getType) fieldTypes.zip(head).forall { case (l, r) => - // check if time indicators match - if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { - val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime - val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime - leftTime == rightTime - } - // one side is not an indicator - else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { - false - } - // uninteresting types - else { - true - } + validateUnionPair(l, r) } } + setOp.copy(setOp.getTraitSet, inputs, setOp.all) + } + + private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = { + val exceptionMsg = + s"Union fields with time attributes requires same types, but the types are %s and %s." + // check if time indicators match + val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { + val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime + if (leftTime && leftTime) { + //rowtime must have same type + isTimestampLtzIndicatorType(l) == isTimestampLtzIndicatorType(r) + } else { + leftTime == rightTime + } + } + // one side is not an indicator + else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { + false + } + // uninteresting types + else { + true + } + if (!isValid) { throw new ValidationException( - "Union fields with time attributes have different types.") + String.format(exceptionMsg, l.toString, r.toString)) } - - setOp.copy(setOp.getTraitSet, inputs, setOp.all) + isValid } private def gatherIndicesToMaterialize(aggregate: Aggregate, input: RelNode): Set[Int] = { @@ -636,7 +656,7 @@ class RexTimeIndicatorMaterializer( val resolvedRefType = input(inputRef.getIndex) // input is a valid time indicator if (isTimeIndicatorType(resolvedRefType)) { - inputRef + rexBuilder.makeInputRef(resolvedRefType, inputRef.getIndex) } // input has been materialized else { @@ -669,17 +689,7 @@ class RexTimeIndicatorMaterializer( case FlinkSqlOperatorTable.SESSION_OLD | FlinkSqlOperatorTable.HOP_OLD | FlinkSqlOperatorTable.TUMBLE_OLD => - updatedCall.getOperands.toList - - // The type of FINAL(MATCH_ROWTIME) is inferred by first operand's type, - // but the initial type of MATCH_ROWTIME is TIMESTAMP(3) *ROWTIME* - // rewrite the return type of according to the final type of MATCH_ROWTIME - case FINAL if updatedCall.getOperands.size() == 1 - && isMatchTimeIndicator(updatedCall.getOperands.get(0)) => - val rowtimeType = updatedCall.getOperands.get(0).getType - updatedCall.clone( - rowtimeType.asInstanceOf[TimeIndicatorRelDataType].originalType, - updatedCall.getOperands).getOperands.toList + updatedCall.getOperands.toList case _ => updatedCall.getOperands.map { o => @@ -711,21 +721,23 @@ class RexTimeIndicatorMaterializer( // All calls in MEASURES and DEFINE are wrapped with FINAL/RUNNING, therefore // we should treat FINAL(MATCH_ROWTIME) and FINAL(MATCH_PROCTIME) as a time attribute - // extraction + // extraction. The type of FINAL(MATCH_ROWTIME) is inferred by first operand's type, + // the initial type of MATCH_ROWTIME is TIMESTAMP(3) *ROWTIME*, it may be rewrote, + // so we re case FINAL if updatedCall.getOperands.size() == 1 && isMatchTimeIndicator(updatedCall.getOperands.get(0)) => val rowtimeType = updatedCall.getOperands.get(0).getType - updatedCall.clone( - rowtimeType, - materializedOperands) + rexBuilder.makeCall(rowtimeType, updatedCall.getOperator, updatedCall.getOperands) + // MATCH_ROWTIME() is a no-args function, it can own two kind of return types based + // on the rowtime attribute type of its input, we rewrite the return type here case FlinkSqlOperatorTable.MATCH_ROWTIME if isTimeIndicatorType(updatedCall.getType) => val rowtimeType = input.filter(isTimeIndicatorType).head - - updatedCall.clone( + rexBuilder.makeCall( rowtime( updatedCall.getType.isNullable, isTimestampLtzIndicatorType(rowtimeType)), + updatedCall.getOperator, materializedOperands) // do not modify window time attributes @@ -796,6 +808,7 @@ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) { def getRowTypeWithoutIndicators( relType: RelDataType, + isLtzRowtimeIndicator: Boolean, shouldMaterialize: String => Boolean): RelDataType = { val outputTypeBuilder = rexBuilder .getTypeFactory @@ -803,10 +816,28 @@ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) { .builder() relType.getFieldList.asScala.zipWithIndex.foreach { case (field, idx) => - if (isTimeIndicatorType(field.getType) && shouldMaterialize(field.getName)) { - outputTypeBuilder.add( - field.getName, - timestamp(field.getType.isNullable, isTimestampLtzIndicatorType(field.getType))) + if (isTimeIndicatorType(field.getType)) { + val convertedTimeIndicatorFieldType = if (isLtzRowtimeIndicator) { + rexBuilder + .getTypeFactory + .asInstanceOf[FlinkTypeFactory] + .createFieldTypeFromLogicalType( + new LocalZonedTimestampType(field.getType.isNullable, TimestampKind.ROWTIME, 3)) + } else { + field.getType + } + + if (shouldMaterialize(field.getName)) { + outputTypeBuilder.add( + field.getName, + timestamp( + convertedTimeIndicatorFieldType.isNullable, + isTimestampLtzIndicatorType(convertedTimeIndicatorFieldType))) + } else { + outputTypeBuilder.add( + field.getName, + convertedTimeIndicatorFieldType) + } } else { outputTypeBuilder.add(field.getName, field.getType) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala index 2580f8ad15b048..3f21e88272640a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala @@ -48,7 +48,7 @@ object WatermarkGeneratorCodeGenerator { contextTerm: Option[String] = None): GeneratedWatermarkGenerator = { // validation val watermarkOutputType = FlinkTypeFactory.toLogicalType(watermarkExpr.getType) - if (! (watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE || + if (!(watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE || watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { throw new CodeGenException( "WatermarkGenerator only accepts output data type of TIMESTAMP or TIMESTAMP_LTZ," + diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala index 798296976fd5af..413a3d655e4332 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala @@ -289,8 +289,10 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( if (snapshotTimeInputRef.getType.getSqlTypeName != rightTimeInputRef.get.getType.getSqlTypeName) { - throw new ValidationException("Event-Time Temporal Table Join requires same rowtime" + - " type in left table and versioned table.") + throw new ValidationException( + String.format("Event-Time Temporal Table Join requires same rowtime" + + " type in left table and versioned table, but the rowtime types are %s and %s.", + snapshotTimeInputRef.getType.toString, rightTimeInputRef.get.getType.toString)) } // Deal primary key in TemporalJoinRewriteUniqueKeyRule TemporalJoinUtil.makeInitialRowTimeTemporalTableJoinCondCall( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala index 2b3e524fb2c4ab..5fe6f80c6a66f8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala @@ -57,7 +57,9 @@ class StreamPhysicalIntervalJoinRule .get(windowBounds.get.getRightTimeIdx).getType if (leftTimeAttributeType.getSqlTypeName != rightTimeAttributeType.getSqlTypeName) { throw new ValidationException( - "Interval join with rowtime attribute has different rowtime types") + String.format("Interval join with rowtime attribute requires same rowtime types," + + " but the types are %s and %s.", + leftTimeAttributeType.toString, rightTimeAttributeType.toString)) } true } else { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala index cf86655b3d5821..0d4ff1a99708ca 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala @@ -112,13 +112,15 @@ class UnionTest extends TableTestBase { @Test def testUnionDiffRowTime(): Unit = { - expectedException.expectMessage("Union fields with time attributes have different types") + expectedException.expectMessage( + "Union fields with time attributes requires same types," + + " but the types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.") val sqlQuery = """ |SELECT * FROM ( - | SELECT id, ts, name, timestamp_col, timestamp_ltz_col FROM t1 + | SELECT id, ts, name, timestamp_col FROM t1 | UNION ALL - | SELECT id, ts, name, timestamp_col, timestamp_ltz_col FROM t2) + | SELECT id, ts, name, timestamp_ltz_col FROM t2) """.stripMargin util.verifyRelPlanWithType(sqlQuery) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala index 59857c99dd39dc..fac2a86edc7d46 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala @@ -92,7 +92,8 @@ class IntervalJoinTest extends TableTestBase { @Test def testIntervalJoinOnDiffRowTimeType(): Unit = { expectedException.expectMessage( - "Interval join with rowtime attribute has different rowtime types") + "Interval join with rowtime attribute requires same rowtime types," + + " but the types are TIMESTAMP(3) *ROWTIME* and TIMESTAMP_LTZ(3) *ROWTIME*") val sql = """ |SELECT t2.a FROM MyTable2 t1 JOIN MyTable3 t2 ON diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala index abd18b11a6d48d..aa1c0825937188 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala @@ -50,8 +50,6 @@ class TemporalFunctionJoinTest extends TableTestBase { "ProctimeRates", proctimeRatesHistory.createTemporalTableFunction($"proctime", $"currency")) - - @Test def testSimpleJoin(): Unit = { val sqlQuery = "SELECT " + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala index 5e9306b02d3d31..2750ac1a3c24cf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala @@ -514,8 +514,8 @@ class TemporalJoinTest extends TableTestBase { """.stripMargin expectExceptionThrown( sqlQuery8, - "Event-Time Temporal Table Join requires same rowtime" + - " type in left table and versioned table.", + "Event-Time Temporal Table Join requires same rowtime type in left table and versioned" + + " table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.", classOf[ValidationException]) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala index d49f66322ca2c2..11b706ac263c47 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala @@ -56,6 +56,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) val timestampDataId = TestValuesTableFactory.registerData(TestData.timestampData) val timestampLtzDataId = TestValuesTableFactory.registerData(TestData.timestampLtzData) + tEnv.getConfig.setLocalTimeZone(SHANGHAI_ZONE) tEnv.executeSql( s""" |CREATE TABLE testTable ( @@ -76,8 +77,6 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) | 'failing-source' = 'true' |) |""".stripMargin) - - //tEnv.getConfig.setLocalTimeZone(SHANGHAI_ZONE) } @Test diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala index 5f313478aaddfe..7bb92a40cde938 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction -import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestData, TestingAppendSink, UserDefinedFunctionTestUtils} +import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, UserDefinedFunctionTestUtils} import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.types.Row import org.junit.Assert.assertEquals @@ -371,13 +371,15 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState val data: Seq[Row] = Seq( //first window - rowOf("ACME", Instant.ofEpochMilli(1), 1, 1), - rowOf("ACME", Instant.ofEpochMilli(2), 2, 2), + rowOf("ACME", Instant.ofEpochSecond(1), 1, 1), + rowOf("ACME", Instant.ofEpochSecond(2), 2, 2), //second window - rowOf("ACME", Instant.ofEpochMilli(3), 1, 4), - rowOf("ACME", Instant.ofEpochMilli(4), 1, 3) + rowOf("ACME", Instant.ofEpochSecond(3), 1, 4), + rowOf("ACME", Instant.ofEpochSecond(4), 1, 3) ) + tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) + val dataId = TestValuesTableFactory.registerData(data) tEnv.executeSql( s""" @@ -422,8 +424,8 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState env.execute() val expected = List( - "ACME,3,1970-01-01T00:00:02.999,1970-01-01T00:00", - "ACME,2,1970-01-01T00:00:05.999,1970-01-01T00:00:03") + "ACME,3,1970-01-01T08:00:02.999,1970-01-01T08:00", + "ACME,2,1970-01-01T08:00:05.999,1970-01-01T08:00:03") assertEquals(expected.sorted, sink.getAppendResults.sorted) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala index cf04119cf59454..8d3db6d228e37b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala @@ -105,7 +105,7 @@ class SourceWatermarkITCase extends StreamingTestBase { val ddl = s""" - | CREATE Table VirtualTable ( + | CREATE Table VirtualTable1 ( | a INT, | b BIGINT, | c TIMESTAMP_LTZ(3), @@ -130,13 +130,13 @@ class SourceWatermarkITCase extends StreamingTestBase { "2,3,2020-11-21T13:00:05.230Z" //the utc timestamp of local ts 2020-11-21T21:00:05.230 ) - val query = "SELECT a, b, c FROM VirtualTable" + val query = "SELECT a, b, c FROM VirtualTable1" val result = tEnv.sqlQuery(query).toAppendStream[Row] val sink = new TestingAppendSink result.addSink(sink) env.execute() - val actualWatermark = TestValuesTableFactory.getWatermarkOutput("VirtualTable") + val actualWatermark = TestValuesTableFactory.getWatermarkOutput("VirtualTable1") .asScala .map(x => LocalDateTime.ofInstant(Instant.ofEpochMilli(x.getTimestamp), zoneId).toString) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala index 447b58e7dd60fc..f2c195bea2850d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala @@ -130,79 +130,79 @@ class TimeAttributeITCase extends StreamingTestBase { assertEquals(expected.sorted, sink.getAppendResults.sorted) } -// @Test -// def testWindowAggregateOnCustomizedWatermark(): Unit = { -// JavaFunc5.openCalled = false -// JavaFunc5.closeCalled = false -// tEnv.createTemporaryFunction("myFunc", new JavaFunc5) -// val ddl = -// s""" -// |CREATE TABLE src ( -// | log_ts STRING, -// | ts TIMESTAMP(3), -// | a INT, -// | b DOUBLE, -// | WATERMARK FOR ts AS myFunc(ts, a) -// |) WITH ( -// | 'connector' = 'values', -// | 'data-id' = '$dataId' -// |) -// """.stripMargin -// val query = -// """ -// |SELECT TUMBLE_END(ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) -// |FROM src -// |GROUP BY TUMBLE(ts, INTERVAL '0.003' SECOND) -// """.stripMargin -// tEnv.executeSql(ddl) -// val sink = new TestingAppendSink() -// tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) -// env.execute("SQL JOB") -// -// val expected = Seq( -// "1970-01-01T00:00:00.003,2,3.0", -// "1970-01-01T00:00:00.006,2,7.0", -// "1970-01-01T00:00:00.009,2,6.0", -// "1970-01-01T00:00:00.018,1,4.0") -// assertEquals(expected.sorted, sink.getAppendResults.sorted) -// assertTrue(JavaFunc5.openCalled) -// assertTrue(JavaFunc5.closeCalled) -// } -// -// @Test -// def testWindowAggregateOnComputedRowtime(): Unit = { -// val ddl = -// s""" -// |CREATE TABLE src ( -// | log_ts STRING, -// | ts TIMESTAMP(3), -// | a INT, -// | b DOUBLE, -// | rowtime AS CAST(log_ts AS TIMESTAMP(3)), -// | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND -// |) WITH ( -// | 'connector' = 'values', -// | 'data-id' = '$dataId' -// |) -// """.stripMargin -// val query = -// """ -// |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) -// |FROM src -// |GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) -// """.stripMargin -// tEnv.executeSql(ddl) -// val sink = new TestingAppendSink() -// tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) -// env.execute("SQL JOB") -// -// val expected = Seq( -// "1970-01-01T00:00:00.003,2,3.0", -// "1970-01-01T00:00:00.006,2,7.0", -// "1970-01-01T00:00:00.009,2,6.0", -// "1970-01-01T00:00:00.018,1,4.0") -// assertEquals(expected.sorted, sink.getAppendResults.sorted) -// } + @Test + def testWindowAggregateOnCustomizedWatermark(): Unit = { + JavaFunc5.openCalled = false + JavaFunc5.closeCalled = false + tEnv.createTemporaryFunction("myFunc", new JavaFunc5) + val ddl = + s""" + |CREATE TABLE src ( + | log_ts STRING, + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | WATERMARK FOR ts AS myFunc(ts, a) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + """.stripMargin + val query = + """ + |SELECT TUMBLE_END(ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) + |FROM src + |GROUP BY TUMBLE(ts, INTERVAL '0.003' SECOND) + """.stripMargin + tEnv.executeSql(ddl) + val sink = new TestingAppendSink() + tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) + env.execute("SQL JOB") + + val expected = Seq( + "1970-01-01T00:00:00.003,2,3.0", + "1970-01-01T00:00:00.006,2,7.0", + "1970-01-01T00:00:00.009,2,6.0", + "1970-01-01T00:00:00.018,1,4.0") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + assertTrue(JavaFunc5.openCalled) + assertTrue(JavaFunc5.closeCalled) + } + + @Test + def testWindowAggregateOnComputedRowtime(): Unit = { + val ddl = + s""" + |CREATE TABLE src ( + | log_ts STRING, + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | rowtime AS CAST(log_ts AS TIMESTAMP(3)), + | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + """.stripMargin + val query = + """ + |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) + |FROM src + |GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) + """.stripMargin + tEnv.executeSql(ddl) + val sink = new TestingAppendSink() + tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) + env.execute("SQL JOB") + + val expected = Seq( + "1970-01-01T00:00:00.003,2,3.0", + "1970-01-01T00:00:00.006,2,7.0", + "1970-01-01T00:00:00.009,2,6.0", + "1970-01-01T00:00:00.018,1,4.0") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index 385639df53d5b0..da89489eac50e0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -155,7 +155,8 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { rankStart, rankEnd, outputRankNumber, - windowEndIndex); + windowEndIndex, + shiftTimeZone); return new SlicingWindowOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java index e2d0c4fd829519..a5442d3c7b4450 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java @@ -43,6 +43,7 @@ import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** * An implementation of {@link WindowCombineFunction} that save topN records of incremental input @@ -80,6 +81,9 @@ public final class TopNRecordsCombiner implements WindowCombineFunction { /** Whether the operator works in event-time mode, used to indicate registering which timer. */ private final boolean isEventTime; + /** The shifted timezone of the window. */ + private final ZoneId shiftTimeZone; + public TopNRecordsCombiner( InternalTimerService timerService, StateKeyContext keyContext, @@ -90,7 +94,8 @@ public TopNRecordsCombiner( boolean requiresCopyKey, TypeSerializer keySerializer, TypeSerializer recordSerializer, - boolean isEventTime) { + boolean isEventTime, + ZoneId shiftTimeZone) { this.timerService = timerService; this.keyContext = keyContext; this.dataState = dataState; @@ -101,6 +106,7 @@ public TopNRecordsCombiner( this.keySerializer = keySerializer; this.recordSerializer = recordSerializer; this.isEventTime = isEventTime; + this.shiftTimeZone = shiftTimeZone; } @Override @@ -145,7 +151,8 @@ public void combine(WindowKey windowKey, Iterator records) throws Excep } // step 3: register timer for current window if (isEventTime) { - timerService.registerEventTimeTimer(window, window - 1); + timerService.registerEventTimeTimer( + window, toEpochMillsForTimer(window - 1, shiftTimeZone)); } // we don't need register processing-time timer, because we already register them // per-record in AbstractWindowAggProcessor.processElement() @@ -207,7 +214,8 @@ public WindowCombineFunction create( requiresCopyKey, keySerializer, recordSerializer, - isEventTime); + isEventTime, + shiftTimeZone); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java index f8de23f6b7973f..8521883f405002 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java @@ -35,6 +35,7 @@ import org.apache.flink.table.runtime.operators.window.state.WindowMapState; import org.apache.flink.types.RowKind; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -60,6 +61,8 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private final long rankEnd; private final boolean outputRankNumber; private final int windowEndIndex; + /** The shifted timezone of the window. */ + private final ZoneId shiftTimeZone; // ---------------------------------------------------------------------------------------- @@ -84,7 +87,8 @@ public WindowRankProcessor( long rankStart, long rankEnd, boolean outputRankNumber, - int windowEndIndex) { + int windowEndIndex, + ZoneId shiftTimeZone) { this.inputSerializer = inputSerializer; this.generatedSortKeyComparator = genSortKeyComparator; this.sortKeySerializer = sortKeySerializer; @@ -94,6 +98,7 @@ public WindowRankProcessor( this.rankEnd = rankEnd; this.outputRankNumber = outputRankNumber; this.windowEndIndex = windowEndIndex; + this.shiftTimeZone = shiftTimeZone; } @Override @@ -122,7 +127,7 @@ public void open(Context context) throws Exception { ctx.getKeyedStateBackend(), windowState, true, - null); + shiftTimeZone); this.windowBuffer = bufferFactory.create( ctx.getOperatorOwner(), diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java index 9b8c78f3e5c343..bd569a75b1bad9 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java @@ -27,8 +27,6 @@ import org.apache.flink.table.runtime.util.WindowKey; import org.apache.flink.util.Collector; -import javax.annotation.Nullable; - import java.io.Serializable; import java.time.ZoneId; import java.util.Iterator; @@ -75,7 +73,7 @@ WindowCombineFunction create( KeyedStateBackend stateBackend, WindowState windowState, boolean isEventTime, - @Nullable ZoneId shiftTimeZone) + ZoneId shiftTimeZone) throws Exception; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java index c9c9571f8221a8..1f90d48947bbe3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java @@ -36,7 +36,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW; import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute; /** Utils for type. */ public class TypeCheckUtils { @@ -55,7 +55,7 @@ public static boolean isTimePoint(LogicalType type) { public static boolean isRowTime(LogicalType type) { return (type instanceof TimestampType || type instanceof LocalZonedTimestampType) - && isTimeAttribute(type); + && isRowtimeAttribute(type); } public static boolean isProcTime(LogicalType type) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java index 10854fbe976e6a..7e8d46c96c8035 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java @@ -49,6 +49,7 @@ public class TimeWindowUtil { * @return the mills which can describe the local timestamp string in given timezone. */ public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { + // Long.MAX_VALUE is a flag of max watermark, directly return it if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == epochMills) { return epochMills; } @@ -65,6 +66,7 @@ public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { * @return the epoch mills. */ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTimeZone) { + // Long.MAX_VALUE is a flag of max watermark, directly return it if (UTC_ZONE_ID.equals(shiftTimeZone.equals(shiftTimeZone)) || Long.MAX_VALUE == utcTimestampMills) { return utcTimestampMills;