diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index 84f27c462926d6..5aa6015ff62efe 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -198,14 +198,11 @@ private List resolveWatermarkSpecs( } validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); - if (!watermarkExpression - .getOutputDataType() - .getLogicalType() - .getTypeRoot() - .equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { + if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot() + == validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { throw new ValidationException( String.format( - "The watermark output type %s is different with input time filed type %s.", + "The watermark output type %s is different from input time filed type %s.", watermarkExpression.getOutputDataType(), validatedTimeColumn.getDataType())); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 205fdf3f10399a..0dfb68017fbd3c 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -216,7 +216,7 @@ public void testSchemaResolutionErrors() { .column("ts", DataTypes.TIMESTAMP(3)) .watermark("ts", callSql(INVALID_WATERMARK_SQL)) .build(), - "The watermark output type TIMESTAMP_LTZ(3) is different with input time filed type TIMESTAMP(3)."); + "The watermark output type TIMESTAMP_LTZ(3) is different from input time filed type TIMESTAMP(3)."); testError( Schema.newBuilder() diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java index 0c6506dec9ce44..4a81cee1a946db 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java @@ -109,9 +109,7 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil } public static boolean isTimeAttribute(LogicalType logicalType) { - return (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) - || hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) - && logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR; + return isRowtimeAttribute(logicalType) || isProctimeAttribute(logicalType); } public static boolean isRowtimeAttribute(LogicalType logicalType) { @@ -126,8 +124,9 @@ public static boolean isProctimeAttribute(LogicalType logicalType) { } public static boolean canBeTimeAttributeType(LogicalType logicalType) { - if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE - || logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + LogicalTypeRoot typeRoot = logicalType.getTypeRoot(); + if (typeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE + || typeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { return true; } return false; diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala index 54cdfd67d855cd..e0b0648107ac97 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala @@ -338,10 +338,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - inputTypes.forall { t => + inputTypes.foreach { t => val fieldTypes = t.getFieldList.map(_.getType) - fieldTypes.zip(head).forall { case (l, r) => + fieldTypes.zip(head).foreach{ case (l, r) => validateUnionPair(l, r) } } @@ -349,18 +349,18 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { setOp.copy(setOp.getTraitSet, inputs, setOp.all) } - private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = { + private def validateUnionPair(l: RelDataType, r: RelDataType): Unit = { val exceptionMsg = s"Union fields with time attributes requires same types, but the types are %s and %s." // check if time indicators match val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { - val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime - val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime - if (leftTime && leftTime) { + val leftIsEventTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightIsEventTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime + if (leftIsEventTime && rightIsEventTime) { //rowtime must have same type isTimestampLtzIndicatorType(l) == isTimestampLtzIndicatorType(r) } else { - leftTime == rightTime + leftIsEventTime == rightIsEventTime } } // one side is not an indicator @@ -376,7 +376,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { throw new ValidationException( String.format(exceptionMsg, l.toString, r.toString)) } - isValid } private def gatherIndicesToMaterialize(aggregate: Aggregate, input: RelNode): Set[Int] = { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java index 23969b3c5a5300..deeebfd65751be 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java @@ -40,7 +40,6 @@ import java.time.ZoneId; import static org.apache.flink.table.runtime.operators.window.TimeWindow.getWindowStartWithOffset; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; /** * The operator used for local window aggregation. @@ -73,10 +72,10 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator element) throws Exception { @Override public void processWatermark(Watermark mark) throws Exception { - long timestamp = toUtcTimestampMills(mark.getTimestamp(), shiftTimezone); - if (timestamp > currentShiftWatermark) { - currentShiftWatermark = timestamp; - if (currentShiftWatermark >= nextTriggerWatermark) { + if (mark.getTimestamp() > currentWatermark) { + currentWatermark = mark.getTimestamp(); + if (currentWatermark >= nextTriggerWatermark) { // we only need to call advanceProgress() when current watermark may trigger window - windowBuffer.advanceProgress(currentShiftWatermark); - nextTriggerWatermark = - getNextTriggerWatermark(currentShiftWatermark, windowInterval); + windowBuffer.advanceProgress(currentWatermark); + nextTriggerWatermark = getNextTriggerWatermark(currentWatermark, windowInterval); } } super.processWatermark(mark); @@ -192,11 +190,11 @@ private long computeMemorySize() { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - /** Method to get the next shifted watermark to trigger window. */ - private static long getNextTriggerWatermark(long currentShiftWatermark, long interval) { - long start = getWindowStartWithOffset(currentShiftWatermark, 0L, interval); + /** Method to get the next watermark to trigger window. */ + private static long getNextTriggerWatermark(long currentWatermark, long interval) { + long start = getWindowStartWithOffset(currentWatermark, 0L, interval); long triggerWatermark = start + interval - 1; - if (triggerWatermark > currentShiftWatermark) { + if (triggerWatermark > currentWatermark) { return triggerWatermark; } else { return triggerWatermark + interval; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java index 116a570d8fefc3..44bdd2c9b3fc9d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java @@ -156,7 +156,8 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { combinerFactory, (SliceSharedAssigner) assigner, accSerializer, - indexOfCountStart); + indexOfCountStart, + shiftTimeZone); } else if (assigner instanceof SliceUnsharedAssigner) { windowProcessor = new SliceUnsharedWindowAggProcessor( @@ -164,11 +165,12 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { bufferFactory, combinerFactory, (SliceUnsharedAssigner) assigner, - accSerializer); + accSerializer, + shiftTimeZone); } else { throw new IllegalArgumentException( "assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner."); } - return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone); + return new SlicingWindowOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java index 3cdeb9c5726b82..45e9634390afb0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java @@ -25,11 +25,13 @@ import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.WindowKeySerializer; import org.apache.flink.table.runtime.util.KeyValueIterator; +import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.runtime.util.WindowKey; import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo; import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap; import java.io.EOFException; +import java.time.ZoneId; import java.util.Iterator; /** @@ -42,8 +44,9 @@ public final class RecordsWindowBuffer implements WindowBuffer { private final WindowBytesMultiMap recordsBuffer; private final WindowKey reuseWindowKey; private final AbstractRowDataSerializer recordSerializer; + private final ZoneId shiftTimeZone; - private long minTriggerTime = Long.MAX_VALUE; + private long minSliceEnd = Long.MAX_VALUE; public RecordsWindowBuffer( Object operatorOwner, @@ -51,13 +54,15 @@ public RecordsWindowBuffer( long memorySize, WindowCombineFunction combineFunction, PagedTypeSerializer keySer, - AbstractRowDataSerializer inputSer) { + AbstractRowDataSerializer inputSer, + ZoneId shiftTimeZone) { this.combineFunction = combineFunction; this.recordsBuffer = new WindowBytesMultiMap( operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity()); this.recordSerializer = inputSer; this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance(); + this.shiftTimeZone = shiftTimeZone; } @Override @@ -65,7 +70,7 @@ public void addElement(RowData key, long sliceEnd, RowData element) throws Excep // track the lowest trigger time, if watermark exceeds the trigger time, // it means there are some elements in the buffer belong to a window going to be fired, // and we need to flush the buffer into state for firing. - minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime); + minSliceEnd = Math.min(sliceEnd - 1, minSliceEnd); reuseWindowKey.replace(sliceEnd, key); LookupInfo> lookup = recordsBuffer.lookup(reuseWindowKey); @@ -81,7 +86,7 @@ public void addElement(RowData key, long sliceEnd, RowData element) throws Excep @Override public void advanceProgress(long progress) throws Exception { - if (progress >= minTriggerTime) { + if (progress >= TimeWindowUtil.toEpochMillsForTimer(minSliceEnd, shiftTimeZone)) { // there should be some window to be fired, flush buffer to state first flush(); } @@ -97,7 +102,7 @@ public void flush() throws Exception { } recordsBuffer.reset(); // reset trigger time - minTriggerTime = Long.MAX_VALUE; + minSliceEnd = Long.MAX_VALUE; } } @@ -129,9 +134,16 @@ public WindowBuffer create( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowCombineFunction combineFunction) { + WindowCombineFunction combineFunction, + ZoneId shiftTimeZone) { return new RecordsWindowBuffer( - operatorOwner, memoryManager, memorySize, combineFunction, keySer, inputSer); + operatorOwner, + memoryManager, + memorySize, + combineFunction, + keySer, + inputSer, + shiftTimeZone); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java index c1175e80581fd2..7b954f83c20de0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; +import java.time.ZoneId; /** * A buffer that buffers data in memory and flushes many values to state together at a time to avoid @@ -83,13 +84,15 @@ interface Factory extends Serializable { * @param memoryManager the manager that governs memory by Flink framework * @param memorySize the managed memory size can be used by this operator * @param combineFunction the combine function used to combine buffered data into state + * @param shiftTimeZone the shit timezone of the window * @throws IOException thrown if the buffer can't be opened */ WindowBuffer create( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowCombineFunction combineFunction) + WindowCombineFunction combineFunction, + ZoneId shiftTimeZone) throws IOException; } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java index 1243d6f01209d6..2ab8c6309be6f4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java @@ -129,7 +129,7 @@ record = recordSerializer.copy(record); // step 5: register timer for current window if (isEventTime) { - timerService.registerEventTimeWindowTimer(window, window - 1); + timerService.registerEventTimeWindowTimer(window); } // we don't need register processing-time timer, because we already register them // per-record in AbstractWindowAggProcessor.processElement() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java index 33a1f24deb28a3..f12c9a631cdd66 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java @@ -116,7 +116,7 @@ public void combine(WindowKey windowKey, Iterator localAccs) throws Exc accState.update(window, stateAcc); // step 3: register timer for current window - timerService.registerEventTimeWindowTimer(window, window - 1); + timerService.registerEventTimeWindowTimer(window); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java index e04c171782deb2..1ac26aa14c681f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; @@ -34,9 +35,12 @@ import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; import org.apache.flink.table.runtime.operators.window.state.WindowValueState; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import java.time.ZoneId; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** A base implementation of {@link SlicingWindowProcessor} for window aggregate. */ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcessor { @@ -48,6 +52,7 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess protected final SliceAssigner sliceAssigner; protected final TypeSerializer accSerializer; protected final boolean isEventTime; + private final ZoneId shiftTimeZone; // ---------------------------------------------------------------------------------------- @@ -57,7 +62,9 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess protected transient ClockService clockService; - protected transient WindowTimerService timerService; + protected transient InternalTimerService timerService; + + protected transient WindowTimerService windowTimerService; protected transient NamespaceAggsHandleFunction aggregator; @@ -73,13 +80,15 @@ public AbstractWindowAggProcessor( WindowBuffer.Factory bufferFactory, WindowCombineFunction.Factory combinerFactory, SliceAssigner sliceAssigner, - TypeSerializer accSerializer) { + TypeSerializer accSerializer, + ZoneId shiftTimeZone) { this.genAggsHandler = genAggsHandler; this.windowBufferFactory = bufferFactory; this.combineFactory = combinerFactory; this.sliceAssigner = sliceAssigner; this.accSerializer = accSerializer; this.isEventTime = sliceAssigner.isEventTime(); + this.shiftTimeZone = shiftTimeZone; } @Override @@ -93,8 +102,9 @@ public void open(Context context) throws Exception { new ValueStateDescriptor<>("window-aggs", accSerializer)); this.windowState = new WindowValueState<>((InternalValueState) state); - this.clockService = ClockService.of(ctx.getTimerService().getInternalTimerService()); + this.clockService = ClockService.of(ctx.getTimerService()); this.timerService = ctx.getTimerService(); + this.windowTimerService = new WindowTimerServiceImpl(timerService, shiftTimeZone); this.aggregator = genAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader()); this.aggregator.open( @@ -103,7 +113,7 @@ public void open(Context context) throws Exception { final WindowCombineFunction combineFunction = combineFactory.create( ctx.getRuntimeContext(), - ctx.getTimerService(), + windowTimerService, ctx.getKeyedStateBackend(), windowState, isEventTime); @@ -112,7 +122,8 @@ public void open(Context context) throws Exception { ctx.getOperatorOwner(), ctx.getMemoryManager(), ctx.getMemorySize(), - combineFunction); + combineFunction, + shiftTimeZone); this.reuseOutput = new JoinedRowData(); this.currentProgress = Long.MIN_VALUE; @@ -123,9 +134,9 @@ public boolean processElement(RowData key, RowData element) throws Exception { long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService); if (!isEventTime) { // always register processing time for every element when processing time mode - timerService.registerProcessingTimeWindowTimer(sliceEnd, sliceEnd - 1); + windowTimerService.registerProcessingTimeWindowTimer(sliceEnd); } - if (isEventTime && sliceEnd - 1 <= currentProgress) { + if (isEventTime && toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone) <= currentProgress) { // element is late and should be dropped return true; } @@ -135,9 +146,8 @@ public boolean processElement(RowData key, RowData element) throws Exception { @Override public void advanceProgress(long progress) throws Exception { - long timestamp = toUtcTimestampMills(progress, timerService.getShiftTimeZone()); - if (timestamp > currentProgress) { - currentProgress = timestamp; + if (progress > currentProgress) { + currentProgress = progress; windowBuffer.advanceProgress(currentProgress); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java index cbf073d653244f..6f59fbc5a6cd96 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.time.ZoneId; import java.util.Optional; import java.util.function.Supplier; @@ -52,8 +53,15 @@ public SliceSharedWindowAggProcessor( WindowCombineFunction.Factory combinerFactory, SliceSharedAssigner sliceAssigner, TypeSerializer accSerializer, - int indexOfCountStar) { - super(genAggsHandler, bufferFactory, combinerFactory, sliceAssigner, accSerializer); + int indexOfCountStar, + ZoneId shiftTimeZone) { + super( + genAggsHandler, + bufferFactory, + combinerFactory, + sliceAssigner, + accSerializer, + shiftTimeZone); this.sliceSharedAssigner = sliceAssigner; this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner); } @@ -76,9 +84,9 @@ public void fireWindow(Long windowEnd) throws Exception { if (nextWindowEndOptional.isPresent()) { long nextWindowEnd = nextWindowEndOptional.get(); if (sliceSharedAssigner.isEventTime()) { - timerService.registerEventTimeWindowTimer(nextWindowEnd, nextWindowEnd - 1); + windowTimerService.registerEventTimeWindowTimer(nextWindowEnd); } else { - timerService.registerProcessingTimeWindowTimer(nextWindowEnd, nextWindowEnd - 1); + windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java index 69bc2e631daae4..32d0f43b585f44 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java @@ -25,6 +25,8 @@ import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SliceUnsharedAssigner; +import java.time.ZoneId; + /** * An window aggregate processor implementation which works for {@link SliceUnsharedAssigner}, e.g. * tumbling windows. @@ -37,8 +39,15 @@ public SliceUnsharedWindowAggProcessor( WindowBuffer.Factory windowBufferFactory, WindowCombineFunction.Factory combineFactory, SliceUnsharedAssigner sliceAssigner, - TypeSerializer accSerializer) { - super(genAggsHandler, windowBufferFactory, combineFactory, sliceAssigner, accSerializer); + TypeSerializer accSerializer, + ZoneId shiftTimeZone) { + super( + genAggsHandler, + windowBufferFactory, + combineFactory, + sliceAssigner, + accSerializer, + shiftTimeZone); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index 1374852d5f77d6..da89489eac50e0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -155,7 +155,8 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { rankStart, rankEnd, outputRankNumber, - windowEndIndex); - return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone); + windowEndIndex, + shiftTimeZone); + return new SlicingWindowOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java index 79884dad15df18..f33757e554bf0e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java @@ -144,7 +144,7 @@ public void combine(WindowKey windowKey, Iterator records) throws Excep } // step 3: register timer for current window if (isEventTime) { - timerService.registerEventTimeWindowTimer(window, window - 1); + timerService.registerEventTimeWindowTimer(window); } // we don't need register processing-time timer, because we already register them // per-record in AbstractWindowAggProcessor.processElement() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java index 790b6fcfc541e8..555c95be1fda94 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; @@ -33,9 +34,11 @@ import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; import org.apache.flink.table.runtime.operators.window.state.WindowMapState; import org.apache.flink.types.RowKind; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -43,7 +46,7 @@ import java.util.List; import java.util.Map; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** An window rank processor. */ public final class WindowRankProcessor implements SlicingWindowProcessor { @@ -63,6 +66,7 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private final long rankEnd; private final boolean outputRankNumber; private final int windowEndIndex; + private final ZoneId shiftTimeZone; // ---------------------------------------------------------------------------------------- @@ -70,7 +74,8 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private transient Context ctx; - private transient WindowTimerService timerService; + private transient InternalTimerService timerService; + private transient WindowTimerService windowTimerService; private transient WindowBuffer windowBuffer; @@ -89,7 +94,8 @@ public WindowRankProcessor( long rankStart, long rankEnd, boolean outputRankNumber, - int windowEndIndex) { + int windowEndIndex, + ZoneId shiftTimeZone) { this.inputSerializer = inputSerializer; this.generatedSortKeyComparator = genSortKeyComparator; this.sortKeySerializer = sortKeySerializer; @@ -99,6 +105,7 @@ public WindowRankProcessor( this.rankEnd = rankEnd; this.outputRankNumber = outputRankNumber; this.windowEndIndex = windowEndIndex; + this.shiftTimeZone = shiftTimeZone; } @Override @@ -119,13 +126,14 @@ public void open(Context context) throws Exception { .getOrCreateKeyedState(namespaceSerializer, mapStateDescriptor); this.timerService = ctx.getTimerService(); + this.windowTimerService = new WindowTimerServiceImpl(timerService, shiftTimeZone); this.windowState = new WindowMapState<>( (InternalMapState>) state); final WindowCombineFunction combineFunction = combineFactory.create( ctx.getRuntimeContext(), - ctx.getTimerService(), + windowTimerService, ctx.getKeyedStateBackend(), windowState, true); @@ -134,7 +142,8 @@ public void open(Context context) throws Exception { ctx.getOperatorOwner(), ctx.getMemoryManager(), ctx.getMemorySize(), - combineFunction); + combineFunction, + shiftTimeZone); this.reuseOutput = new JoinedRowData(); this.reuseRankRow = new GenericRowData(1); @@ -144,7 +153,7 @@ public void open(Context context) throws Exception { @Override public boolean processElement(RowData key, RowData element) throws Exception { long sliceEnd = element.getLong(windowEndIndex); - if (sliceEnd - 1 <= currentProgress) { + if (toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone) <= currentProgress) { // element is late and should be dropped return true; } @@ -154,9 +163,8 @@ public boolean processElement(RowData key, RowData element) throws Exception { @Override public void advanceProgress(long progress) throws Exception { - long timestamp = toUtcTimestampMills(progress, timerService.getShiftTimeZone()); - if (timestamp > currentProgress) { - currentProgress = timestamp; + if (progress > currentProgress) { + currentProgress = progress; windowBuffer.advanceProgress(currentProgress); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java index 084e4a55e84003..b1dab8b0bdcf77 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java @@ -109,7 +109,7 @@ public void close() throws Exception {} /** Returns {@code true} if the given time is the cleanup time for the given window. */ protected final boolean isCleanupTime(W window, long time) { - return time == cleanupTime(window); + return time == toEpochMillsForTimer(cleanupTime(window), ctx.getShiftTimeZone()); } /** diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java index ada96f7a8a74be..516f09b199c555 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -40,8 +41,6 @@ import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor; -import java.time.ZoneId; - import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -106,9 +105,6 @@ public final class SlicingWindowOperator extends TableStreamOperator windowProcessor; - /** The shift timezone of the window. */ - private final ZoneId shiftTimeZone; - // ------------------------------------------------------------------------ /** This is used for emitting elements with a given timestamp. */ @@ -118,7 +114,7 @@ public final class SlicingWindowOperator extends TableStreamOperator windowTimerService; + private transient InternalTimerService internalTimerService; /** The tracked processing time triggered last time. */ private transient long lastTriggeredProcessingTime; @@ -131,9 +127,8 @@ public final class SlicingWindowOperator extends TableStreamOperator watermarkLatency; - public SlicingWindowOperator(SlicingWindowProcessor windowProcessor, ZoneId shiftTimeZone) { + public SlicingWindowOperator(SlicingWindowProcessor windowProcessor) { this.windowProcessor = windowProcessor; - this.shiftTimeZone = shiftTimeZone; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -146,18 +141,16 @@ public void open() throws Exception { collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); - windowTimerService = - new WindowTimerServiceImpl<>( - getInternalTimerService( - "window-timers", windowProcessor.createWindowSerializer(), this), - shiftTimeZone); + internalTimerService = + getInternalTimerService( + "window-timers", windowProcessor.createWindowSerializer(), this); windowProcessor.open( new WindowProcessorContext<>( getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), - windowTimerService, + internalTimerService, getKeyedStateBackend(), collector, getRuntimeContext())); @@ -172,11 +165,11 @@ public void open() throws Exception { metrics.gauge( WATERMARK_LATENCY_METRIC_NAME, () -> { - long watermark = windowTimerService.currentWatermark(); + long watermark = internalTimerService.currentWatermark(); if (watermark < 0) { return 0L; } else { - return windowTimerService.currentProcessingTime() - watermark; + return internalTimerService.currentProcessingTime() - watermark; } }); } @@ -255,7 +248,7 @@ private static final class WindowProcessorContext private final Object operatorOwner; private final MemoryManager memoryManager; private final long memorySize; - private final WindowTimerService timerService; + private final InternalTimerService timerService; private final KeyedStateBackend keyedStateBackend; private final Output collector; private final RuntimeContext runtimeContext; @@ -264,7 +257,7 @@ private WindowProcessorContext( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowTimerService timerService, + InternalTimerService timerService, KeyedStateBackend keyedStateBackend, Output collector, RuntimeContext runtimeContext) { @@ -298,7 +291,7 @@ public KeyedStateBackend getKeyedStateBackend() { } @Override - public WindowTimerService getTimerService() { + public InternalTimerService getTimerService() { return timerService; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java index 94b4d0c95d8ca1..4b034842f1ac9d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java @@ -107,7 +107,7 @@ interface Context { KeyedStateBackend getKeyedStateBackend(); /** Returns the current {@link InternalTimerService}. */ - WindowTimerService getTimerService(); + InternalTimerService getTimerService(); /** Returns the current {@link RuntimeContext}. */ RuntimeContext getRuntimeContext(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java index b5aa1c9e6ade64..6280b121b9ef34 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.window.slicing; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.InternalTimerService; import java.time.ZoneId; @@ -44,24 +43,15 @@ public interface WindowTimerService { /** Returns the current event-time watermark. */ long currentWatermark(); - /** Returns the current {@link InternalTimerService}. */ - InternalTimerService getInternalTimerService(); - /** * Registers a window timer to be fired when processing time passes the window. The window you * pass here will be provided when the timer fires. */ - void registerProcessingTimeWindowTimer(W window, long windowEnd); - - /** Deletes the timer for the given key and window. */ - void deleteProcessingTimeWindowTimer(W window, long windowEnd); + void registerProcessingTimeWindowTimer(W window); /** * Registers a window timer to be fired when event time watermark passes the window. The window * you pass here will be provided when the timer fires. */ - void registerEventTimeWindowTimer(W window, long windowEnd); - - /** Deletes the timer for the given key and window. */ - void deleteEventTimeWindowTimer(W window, long windowEnd); + void registerEventTimeWindowTimer(W window); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java index 72cf4b81223298..836ab3df63670e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java @@ -24,13 +24,13 @@ import java.time.ZoneId; /** Simple Implements of {@link WindowTimerService}. */ -public class WindowTimerServiceImpl implements WindowTimerService { +public class WindowTimerServiceImpl implements WindowTimerService { - private final InternalTimerService internalTimerService; + private final InternalTimerService internalTimerService; private final ZoneId shiftTimeZone; public WindowTimerServiceImpl( - InternalTimerService internalTimerService, ZoneId shiftTimeZone) { + InternalTimerService internalTimerService, ZoneId shiftTimeZone) { this.internalTimerService = internalTimerService; this.shiftTimeZone = shiftTimeZone; } @@ -51,31 +51,14 @@ public long currentWatermark() { } @Override - public InternalTimerService getInternalTimerService() { - return internalTimerService; - } - - @Override - public void registerProcessingTimeWindowTimer(W window, long windowEnd) { + public void registerProcessingTimeWindowTimer(Long window) { internalTimerService.registerProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + window, TimeWindowUtil.toEpochMillsForTimer(window - 1, shiftTimeZone)); } @Override - public void deleteProcessingTimeWindowTimer(W window, long windowEnd) { - internalTimerService.deleteProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); - } - - @Override - public void registerEventTimeWindowTimer(W window, long windowEnd) { + public void registerEventTimeWindowTimer(Long window) { internalTimerService.registerEventTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); - } - - @Override - public void deleteEventTimeWindowTimer(W window, long windowEnd) { - internalTimerService.deleteEventTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + window, TimeWindowUtil.toEpochMillsForTimer(window - 1, shiftTimeZone)); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java index 103710afd75c6b..8d2eb2240b37da 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java @@ -95,17 +95,19 @@ public void open(TriggerContext ctx) throws Exception { @Override public boolean onElement(Object element, long timestamp, W window) throws Exception { - if (toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()) - <= ctx.getCurrentWatermark()) { + if (triggerTime(window) <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return true; } else { - ctx.registerEventTimeTimer( - toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); + ctx.registerEventTimeTimer(triggerTime(window)); return false; } } + private long triggerTime(W window) { + return toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); + } + @Override public boolean onProcessingTime(long time, W window) throws Exception { return false; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java index c3523e7c552bb2..2bded459438a4d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java @@ -67,14 +67,34 @@ public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { */ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTimeZone) { // Long.MAX_VALUE is a flag of max watermark, directly return it - if (UTC_ZONE_ID.equals(shiftTimeZone.equals(shiftTimeZone)) - || Long.MAX_VALUE == utcTimestampMills) { + if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == utcTimestampMills) { return utcTimestampMills; } if (TimeZone.getTimeZone(shiftTimeZone).useDaylightTime()) { /* - * return the larger epoch mills if the time is leaving the DST. + * return the first skipped epoch mills as timer time if the time is coming the DST. + * eg. Los_Angele has no timestamp 2021-03-14 02:00:00 when coming DST. + *
+             *  2021-03-14 00:00:00 -> epoch1 = 1615708800000L;
+             *  2021-03-14 01:00:00 -> epoch2 = 1615712400000L;
+             *  2021-03-14 03:00:00 -> epoch3 = 1615716000000L;  skip one hour (2021-03-14 02:00:00)
+             *  2021-03-14 04:00:00 -> epoch4 = 1615719600000L;
+             *
+             * we should use the epoch3 to register timer for window that end with
+             *  [2021-03-14 02:00:00, 2021-03-14 03:00:00] to ensure the window can be fired
+             *  immediately once the window passed.
+             *
+             * 
+             *  2021-03-14 00:00:00 -> epoch0 = 1615708800000L;
+             *  2021-03-14 01:00:00 -> epoch1 = 1615712400000L;
+             *  2021-03-14 02:00:00 -> epoch3 = 1615716000000L; register 1615716000000L(epoch3)
+             *  2021-03-14 02:59:59 -> epoch3 = 1615719599000L; register 1615716000000L(epoch3)
+             *  2021-03-14 03:00:00 -> epoch3 = 1615716000000L;
+             */
+
+            /*
+             * return the larger epoch mills as timer time if the time is leaving the DST.
              *  eg. Los_Angeles has two timestamp 2021-11-07 01:00:00 when leaving DST.
              * 
              *  2021-11-07 00:00:00 -> epoch0 = 1636268400000L;  2021-11-07 00:00:00
@@ -99,8 +119,13 @@ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTime
                             .atZone(shiftTimeZone)
                             .toInstant()
                             .toEpochMilli();
+
+            boolean hasNoEpoch = t1 == t2;
             boolean hasTwoEpochs = t2 - t1 > MILLS_PER_HOUR;
-            if (hasTwoEpochs) {
+
+            if (hasNoEpoch) {
+                return t1 - t1 % MILLS_PER_HOUR;
+            } else if (hasTwoEpochs) {
                 return t1 + MILLS_PER_HOUR;
             } else {
                 return t1;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java
index 77e87ffa4deacb..49210bddd692ec 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java
@@ -23,6 +23,11 @@ public void testShiftedTimeZone() {
         assertEquals(-1L, toEpochMillsForTimer(utcMills("1970-01-01T07:59:59.999"), zoneId));
         assertEquals(1000L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:01"), zoneId));
         assertEquals(1L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:00.001"), zoneId));
+
+        assertEquals(-28799000L, toEpochMills(utcMills("1970-01-01T00:00:01"), zoneId));
+        assertEquals(-1L, toEpochMills(utcMills("1970-01-01T07:59:59.999"), zoneId));
+        assertEquals(1000L, toEpochMills(utcMills("1970-01-01T08:00:01"), zoneId));
+        assertEquals(1L, toEpochMills(utcMills("1970-01-01T08:00:00.001"), zoneId));
     }
 
     @Test
@@ -39,29 +44,51 @@ public void testDaylightSaving() {
         assertEquals(1615708800000L, toEpochMillsForTimer(utcMills("2021-03-14T00:00:00"), zoneId));
         assertEquals(1615712400000L, toEpochMillsForTimer(utcMills("2021-03-14T01:00:00"), zoneId));
         assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:00:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:30:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:59:59"), zoneId));
         assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T03:00:00"), zoneId));
-        assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T02:30:00"), zoneId));
-        assertEquals(1615719599000L, toEpochMillsForTimer(utcMills("2021-03-14T02:59:59"), zoneId));
         assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T03:30:00"), zoneId));
+        assertEquals(1615719599000L, toEpochMillsForTimer(utcMills("2021-03-14T03:59:59"), zoneId));
+
+        assertEquals(1615708800000L, toEpochMills(utcMills("2021-03-14T00:00:00"), zoneId));
+        assertEquals(1615712400000L, toEpochMills(utcMills("2021-03-14T01:00:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMills(utcMills("2021-03-14T02:00:00"), zoneId));
+        assertEquals(1615717800000L, toEpochMills(utcMills("2021-03-14T02:30:00"), zoneId));
+        assertEquals(1615719599000L, toEpochMills(utcMills("2021-03-14T02:59:59"), zoneId));
+        assertEquals(1615717800000L, toEpochMills(utcMills("2021-03-14T03:30:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMills(utcMills("2021-03-14T03:00:00"), zoneId));
 
         /*
-         * The DaylightTime in Los_Angele start at time 2021-11-07 01:00:00
+         * The DaylightTime in Los_Angele end at time 2021-11-07 01:00:00
          * 
          *  2021-11-07 00:00:00 -> epoch0 = 1636268400000L;  2021-11-07 00:00:00
          *  2021-11-07 01:00:00 -> epoch1 = 1636272000000L;  the first local timestamp 2021-11-07 01:00:00
-         *  2021-11-07 01:00:00 -> epoch2 = 1636275600000L;  back to local timestamp  2021-11-07 01:00:00
+         *  2021-11-07 01:00:00 -> epoch2 = 1636275600000L;  back to local timestamp 2021-11-07 01:00:00
          *  2021-11-07 02:00:00 -> epoch3 = 1636279200000L;  2021-11-07 02:00:00
          */
+
+        assertEquals(utcMills("2021-11-07T01:00:00"), toUtcTimestampMills(1636272000000L, zoneId));
+        assertEquals(utcMills("2021-11-07T01:00:00"), toUtcTimestampMills(1636275600000L, zoneId));
+        assertEquals(utcMills("2021-11-07T01:00:01"), toUtcTimestampMills(1636272001000L, zoneId));
+        assertEquals(utcMills("2021-11-07T01:59:59"), toUtcTimestampMills(1636275599000L, zoneId));
+
         assertEquals(1636268400000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:00"), zoneId));
         assertEquals(1636275600000L, toEpochMillsForTimer(utcMills("2021-11-07T01:00:00"), zoneId));
         assertEquals(1636279200000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:00"), zoneId));
         assertEquals(1636268401000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:01"), zoneId));
         assertEquals(1636279199000L, toEpochMillsForTimer(utcMills("2021-11-07T01:59:59"), zoneId));
         assertEquals(1636279201000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:01"), zoneId));
+
+        assertEquals(1636268400000L, toEpochMills(utcMills("2021-11-07T00:00:00"), zoneId));
+        assertEquals(1636272000000L, toEpochMills(utcMills("2021-11-07T01:00:00"), zoneId));
+        assertEquals(1636279200000L, toEpochMills(utcMills("2021-11-07T02:00:00"), zoneId));
+        assertEquals(1636268401000L, toEpochMills(utcMills("2021-11-07T00:00:01"), zoneId));
+        assertEquals(1636275599000L, toEpochMills(utcMills("2021-11-07T01:59:59"), zoneId));
+        assertEquals(1636279201000L, toEpochMills(utcMills("2021-11-07T02:00:01"), zoneId));
     }
 
     @Test
-    public void testMaxWaterMark() {
+    public void testMaxWatermark() {
         ZoneId zoneId = ZoneId.of("Asia/Shanghai");
         assertEquals(Long.MAX_VALUE, toUtcTimestampMills(Long.MAX_VALUE, zoneId));
         assertEquals(Long.MAX_VALUE, toEpochMillsForTimer(Long.MAX_VALUE, zoneId));