diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java index bccf357ac8fdfa..193dd6f762f153 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java @@ -61,6 +61,7 @@ import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END; import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; /** The Python Group Window AggregateFunction operator for the blink planner. */ @@ -245,16 +246,16 @@ public void emitResult(Tuple2 resultTuple) throws Exception { if (timerOperandType == REGISTER_EVENT_TIMER) { internalTimerService.registerEventTimeTimer( - window, toUtcTimestampMills(timestamp, shiftTimeZone)); + window, toEpochMillsForTimer(timestamp, shiftTimeZone)); } else if (timerOperandType == REGISTER_PROCESSING_TIMER) { internalTimerService.registerProcessingTimeTimer( - window, toUtcTimestampMills(timestamp, shiftTimeZone)); + window, toEpochMillsForTimer(timestamp, shiftTimeZone)); } else if (timerOperandType == DELETE_EVENT_TIMER) { internalTimerService.deleteEventTimeTimer( - window, toUtcTimestampMills(timestamp, shiftTimeZone)); + window, toEpochMillsForTimer(timestamp, shiftTimeZone)); } else if (timerOperandType == DELETE_PROCESSING_TIMER) { internalTimerService.deleteProcessingTimeTimer( - window, toUtcTimestampMills(timestamp, shiftTimeZone)); + window, toEpochMillsForTimer(timestamp, shiftTimeZone)); } else { throw new RuntimeException( String.format("Unsupported timerOperandType %s.", timerOperandType)); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java index a7b9b06abb0be8..2471d82ba5dcba 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java @@ -54,7 +54,6 @@ import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction; import org.apache.flink.table.runtime.operators.window.triggers.Trigger; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; -import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -64,6 +63,7 @@ import java.util.LinkedList; import java.util.List; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; @@ -311,7 +311,8 @@ private void buildWindow(PlannerNamedWindowProperty[] namedProperties) { */ private boolean isWindowLate(W window) { return windowAssigner.isEventTime() - && (cleanupTime(window) <= internalTimerService.currentWatermark()); + && (toEpochMillsForTimer(cleanupTime(window), shiftTimeZone) + <= internalTimerService.currentWatermark()); } /** @@ -322,12 +323,11 @@ private boolean isWindowLate(W window) { * @param window the window whose cleanup time we are computing. */ private long cleanupTime(W window) { - long windowMatTs = toEpochMillsForTimer(window.maxTimestamp(), shiftTimeZone); if (windowAssigner.isEventTime()) { - long cleanupTime = windowMatTs + allowedLateness; - return cleanupTime >= windowMatTs ? cleanupTime : Long.MAX_VALUE; + long cleanupTime = window.maxTimestamp() + allowedLateness; + return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { - return windowMatTs; + return window.maxTimestamp(); } } @@ -386,7 +386,7 @@ private boolean hasRetractData( * @param window the window whose state to discard */ private void registerCleanupTimer(W window) { - long cleanupTime = cleanupTime(window); + long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone); if (cleanupTime == Long.MAX_VALUE) { // don't set a GC timer for "end of time" return; @@ -425,11 +425,11 @@ private void setWindowProperty(W currentWindow) { } private long getShiftEpochMills(long utcTimestampMills) { - return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone); + return toEpochMills(utcTimestampMills, shiftTimeZone); } private void cleanWindowIfNeeded(W window, long currentTime) throws Exception { - if (currentTime == cleanupTime(window)) { + if (currentTime == toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)) { windowAccumulateData.setCurrentNamespace(window); windowAccumulateData.clear(); windowRetractData.setCurrentNamespace(window); @@ -457,7 +457,7 @@ boolean onElement(RowData row, long timestamp) throws Exception { } boolean onProcessingTime(long time) throws Exception { - return trigger.onProcessingTime(toUtcTimestampMills(time, shiftTimeZone), window); + return trigger.onProcessingTime(time, window); } boolean onEventTime(long time) throws Exception { @@ -485,8 +485,7 @@ public MetricGroup getMetricGroup() { @Override public void registerProcessingTimeTimer(long time) { - internalTimerService.registerProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.registerProcessingTimeTimer(window, time); } @Override @@ -496,8 +495,7 @@ public void registerEventTimeTimer(long time) { @Override public void deleteProcessingTimeTimer(long time) { - internalTimerService.deleteProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.deleteProcessingTimeTimer(window, time); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala index 4a9d4e7799dd92..0bb81a78e8790b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala @@ -27,8 +27,10 @@ import org.apache.flink.table.data.{RowData, TimestampData} import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.planner.runtime.utils.TestData -import org.apache.flink.table.runtime.util.{RowDataHarnessAssertor, TimeWindowUtil} +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord +import org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills + import org.apache.flink.types.Row import org.apache.flink.types.RowKind.INSERT @@ -310,7 +312,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI private def localMills(dateTime: String): TimestampData = { val windowDateTime = LocalDateTime.parse(dateTime).atZone(UTC_ZONE_ID) TimestampData.fromEpochMillis( - TimeWindowUtil.toUtcTimestampMills(windowDateTime.toInstant.toEpochMilli, shiftTimeZone)) + toUtcTimestampMills(windowDateTime.toInstant.toEpochMilli, shiftTimeZone)) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateUseDaylightTimeHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateUseDaylightTimeHarnessTest.scala index 830a674099f4c2..ac098d02f196ea 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateUseDaylightTimeHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateUseDaylightTimeHarnessTest.scala @@ -117,7 +117,7 @@ class WindowAggregateUseDaylightTimeHarnessTest(backend: StateBackendMode, timeZ expected.add(record("a", 2L, 5.0D, 1L, ts("2021-03-14T03:00:00"), ts("2021-03-14T05:00:00"))) expected.add(record("a", 3L, 5.0D, 1L, ts("2021-03-14T03:00:00"), ts("2021-03-14T06:00:00"))) - // window [2021-11-07T00:00:00, 2021-11-07T02:00:00] windows contains 3 hours data + // [2021-11-07T00:00:00, 2021-11-07T02:00:00] window contains 3 hours data expected.add(record("a", 1L, 3.0D, 1L, ts("2021-11-07T00:00:00"), ts("2021-11-07T01:00:00"))) expected.add(record("a", 3L, 3.0D, 2L, ts("2021-11-07T00:00:00"), ts("2021-11-07T02:00:00"))) expected.add(record("a", 4L, 3.0D, 2L, ts("2021-11-07T00:00:00"), ts("2021-11-07T03:00:00"))) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java index e8f27908ca6e6e..23969b3c5a5300 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java @@ -31,16 +31,17 @@ import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner; -import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.ClockService; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; -import org.apache.flink.table.runtime.util.TimeWindowUtil; import java.time.ZoneId; +import static org.apache.flink.table.runtime.operators.window.TimeWindow.getWindowStartWithOffset; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; + /** * The operator used for local window aggregation. * @@ -73,7 +74,7 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator element) throws Exception { @Override public void processWatermark(Watermark mark) throws Exception { - long timestamp = TimeWindowUtil.toUtcTimestampMills(mark.getTimestamp(), shiftTimezone); - if (timestamp > currentWatermark) { - currentWatermark = timestamp; - if (currentWatermark >= nextTriggerWatermark) { - // we only need to call advanceProgress() when currentWatermark may trigger window - windowBuffer.advanceProgress(currentWatermark); - nextTriggerWatermark = getNextTriggerWatermark(currentWatermark, windowInterval); + long timestamp = toUtcTimestampMills(mark.getTimestamp(), shiftTimezone); + if (timestamp > currentShiftWatermark) { + currentShiftWatermark = timestamp; + if (currentShiftWatermark >= nextTriggerWatermark) { + // we only need to call advanceProgress() when current watermark may trigger window + windowBuffer.advanceProgress(currentShiftWatermark); + nextTriggerWatermark = + getNextTriggerWatermark(currentShiftWatermark, windowInterval); } } super.processWatermark(mark); @@ -190,11 +192,11 @@ private long computeMemorySize() { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - /** Method to get the next watermark to trigger window. */ - private static long getNextTriggerWatermark(long currentWatermark, long interval) { - long start = TimeWindow.getWindowStartWithOffset(currentWatermark, 0L, interval); + /** Method to get the next shifted watermark to trigger window. */ + private static long getNextTriggerWatermark(long currentShiftWatermark, long interval) { + long start = getWindowStartWithOffset(currentShiftWatermark, 0L, interval); long triggerWatermark = start + interval - 1; - if (triggerWatermark > currentWatermark) { + if (triggerWatermark > currentShiftWatermark) { return triggerWatermark; } else { return triggerWatermark + interval; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java index 44bdd2c9b3fc9d..116a570d8fefc3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java @@ -156,8 +156,7 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { combinerFactory, (SliceSharedAssigner) assigner, accSerializer, - indexOfCountStart, - shiftTimeZone); + indexOfCountStart); } else if (assigner instanceof SliceUnsharedAssigner) { windowProcessor = new SliceUnsharedWindowAggProcessor( @@ -165,12 +164,11 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { bufferFactory, combinerFactory, (SliceUnsharedAssigner) assigner, - accSerializer, - shiftTimeZone); + accSerializer); } else { throw new IllegalArgumentException( "assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner."); } - return new SlicingWindowOperator<>(windowProcessor); + return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java index d2e29c4fc0c176..1243d6f01209d6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java @@ -22,24 +22,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowState; import org.apache.flink.table.runtime.operators.window.state.WindowValueState; import org.apache.flink.table.runtime.util.WindowKey; -import org.apache.flink.util.Preconditions; -import java.time.ZoneId; import java.util.Iterator; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** * An implementation of {@link WindowCombineFunction} that accumulates input records into the window @@ -48,7 +45,7 @@ public final class AggRecordsCombiner implements WindowCombineFunction { /** The service to register event-time or processing-time timers. */ - private final InternalTimerService timerService; + private final WindowTimerService timerService; /** Context to switch current key for states. */ private final StateKeyContext keyContext; @@ -71,19 +68,15 @@ public final class AggRecordsCombiner implements WindowCombineFunction { /** Whether the operator works in event-time mode, used to indicate registering which timer. */ private final boolean isEventTime; - /** The shifted timezone of the window. */ - private final ZoneId shiftTimezone; - public AggRecordsCombiner( - InternalTimerService timerService, + WindowTimerService timerService, StateKeyContext keyContext, WindowValueState accState, NamespaceAggsHandleFunction aggregator, boolean requiresCopy, TypeSerializer keySerializer, TypeSerializer recordSerializer, - boolean isEventTime, - ZoneId shiftTimezone) { + boolean isEventTime) { this.timerService = timerService; this.keyContext = keyContext; this.accState = accState; @@ -92,7 +85,6 @@ public AggRecordsCombiner( this.keySerializer = keySerializer; this.recordSerializer = recordSerializer; this.isEventTime = isEventTime; - this.shiftTimezone = shiftTimezone; } @Override @@ -137,8 +129,7 @@ record = recordSerializer.copy(record); // step 5: register timer for current window if (isEventTime) { - timerService.registerEventTimeTimer( - window, toEpochMillsForTimer(window - 1, shiftTimezone)); + timerService.registerEventTimeWindowTimer(window, window - 1); } // we don't need register processing-time timer, because we already register them // per-record in AbstractWindowAggProcessor.processElement() @@ -174,13 +165,11 @@ public Factory( @Override public WindowCombineFunction create( RuntimeContext runtimeContext, - InternalTimerService timerService, + WindowTimerService timerService, KeyedStateBackend stateBackend, WindowState windowState, - boolean isEventTime, - ZoneId shiftTimeZone) + boolean isEventTime) throws Exception { - Preconditions.checkNotNull(shiftTimeZone); final NamespaceAggsHandleFunction aggregator = genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader()); aggregator.open( @@ -196,8 +185,7 @@ public WindowCombineFunction create( requiresCopy, keySerializer, recordSerializer, - isEventTime, - shiftTimeZone); + isEventTime); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java index 9801543a55901a..33a1f24deb28a3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java @@ -22,23 +22,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowState; import org.apache.flink.table.runtime.operators.window.state.WindowValueState; import org.apache.flink.table.runtime.util.WindowKey; -import java.time.ZoneId; import java.util.Iterator; import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * An implementation of {@link WindowCombineFunction} that accumulates local accumulators records @@ -49,7 +46,7 @@ public final class GlobalAggAccCombiner implements WindowCombineFunction { /** The service to register event-time or processing-time timers. */ - private final InternalTimerService timerService; + private final WindowTimerService timerService; /** Context to switch current key for states. */ private final StateKeyContext keyContext; @@ -69,18 +66,14 @@ public final class GlobalAggAccCombiner implements WindowCombineFunction { /** Serializer to copy key if required. */ private final TypeSerializer keySerializer; - /** The shifted timezone of the window. */ - private final ZoneId shiftTimeZone; - public GlobalAggAccCombiner( - InternalTimerService timerService, + WindowTimerService timerService, StateKeyContext keyContext, WindowValueState accState, NamespaceAggsHandleFunction localAggregator, NamespaceAggsHandleFunction globalAggregator, boolean requiresCopy, - TypeSerializer keySerializer, - ZoneId shiftTimeZone) { + TypeSerializer keySerializer) { this.timerService = timerService; this.keyContext = keyContext; this.accState = accState; @@ -88,7 +81,6 @@ public GlobalAggAccCombiner( this.globalAggregator = globalAggregator; this.requiresCopy = requiresCopy; this.keySerializer = keySerializer; - this.shiftTimeZone = shiftTimeZone; } @Override @@ -124,8 +116,7 @@ public void combine(WindowKey windowKey, Iterator localAccs) throws Exc accState.update(window, stateAcc); // step 3: register timer for current window - timerService.registerEventTimeTimer( - window, toEpochMillsForTimer(window - 1, shiftTimeZone)); + timerService.registerEventTimeWindowTimer(window, window - 1); } @Override @@ -162,13 +153,11 @@ public Factory( @Override public WindowCombineFunction create( RuntimeContext runtimeContext, - InternalTimerService timerService, + WindowTimerService timerService, KeyedStateBackend stateBackend, WindowState windowState, - boolean isEventTime, - ZoneId shiftTimeZone) + boolean isEventTime) throws Exception { - checkNotNull(shiftTimeZone); final NamespaceAggsHandleFunction localAggregator = genLocalAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader()); final NamespaceAggsHandleFunction globalAggregator = @@ -188,8 +177,7 @@ public WindowCombineFunction create( localAggregator, globalAggregator, requiresCopy, - keySerializer, - shiftTimeZone); + keySerializer); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java index 9df05cb197949e..e04c171782deb2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; @@ -34,11 +33,9 @@ import org.apache.flink.table.runtime.operators.window.slicing.ClockService; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.WindowValueState; -import java.time.ZoneId; - -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; /** A base implementation of {@link SlicingWindowProcessor} for window aggregate. */ @@ -52,13 +49,6 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess protected final TypeSerializer accSerializer; protected final boolean isEventTime; - /** - * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift - * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC - * which means never shift when assigning windows. - */ - protected final ZoneId shiftTimeZone; - // ---------------------------------------------------------------------------------------- protected transient long currentProgress; @@ -67,7 +57,7 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess protected transient ClockService clockService; - protected transient InternalTimerService timerService; + protected transient WindowTimerService timerService; protected transient NamespaceAggsHandleFunction aggregator; @@ -83,15 +73,13 @@ public AbstractWindowAggProcessor( WindowBuffer.Factory bufferFactory, WindowCombineFunction.Factory combinerFactory, SliceAssigner sliceAssigner, - TypeSerializer accSerializer, - ZoneId shiftTimeZone) { + TypeSerializer accSerializer) { this.genAggsHandler = genAggsHandler; this.windowBufferFactory = bufferFactory; this.combineFactory = combinerFactory; this.sliceAssigner = sliceAssigner; this.accSerializer = accSerializer; this.isEventTime = sliceAssigner.isEventTime(); - this.shiftTimeZone = shiftTimeZone; } @Override @@ -105,7 +93,7 @@ public void open(Context context) throws Exception { new ValueStateDescriptor<>("window-aggs", accSerializer)); this.windowState = new WindowValueState<>((InternalValueState) state); - this.clockService = ClockService.of(ctx.getTimerService()); + this.clockService = ClockService.of(ctx.getTimerService().getInternalTimerService()); this.timerService = ctx.getTimerService(); this.aggregator = genAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader()); @@ -118,8 +106,7 @@ public void open(Context context) throws Exception { ctx.getTimerService(), ctx.getKeyedStateBackend(), windowState, - isEventTime, - shiftTimeZone); + isEventTime); this.windowBuffer = windowBufferFactory.create( ctx.getOperatorOwner(), @@ -136,8 +123,7 @@ public boolean processElement(RowData key, RowData element) throws Exception { long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService); if (!isEventTime) { // always register processing time for every element when processing time mode - timerService.registerProcessingTimeTimer( - sliceEnd, toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone)); + timerService.registerProcessingTimeWindowTimer(sliceEnd, sliceEnd - 1); } if (isEventTime && sliceEnd - 1 <= currentProgress) { // element is late and should be dropped @@ -149,7 +135,7 @@ public boolean processElement(RowData key, RowData element) throws Exception { @Override public void advanceProgress(long progress) throws Exception { - long timestamp = toUtcTimestampMills(progress, shiftTimeZone); + long timestamp = toUtcTimestampMills(progress, timerService.getShiftTimeZone()); if (timestamp > currentProgress) { currentProgress = timestamp; windowBuffer.advanceProgress(currentProgress); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java index e600b9cac59cb7..cbf073d653244f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java @@ -26,12 +26,10 @@ import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners; import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner; -import org.apache.flink.table.runtime.util.TimeWindowUtil; import javax.annotation.Nullable; import java.io.Serializable; -import java.time.ZoneId; import java.util.Optional; import java.util.function.Supplier; @@ -54,15 +52,8 @@ public SliceSharedWindowAggProcessor( WindowCombineFunction.Factory combinerFactory, SliceSharedAssigner sliceAssigner, TypeSerializer accSerializer, - int indexOfCountStar, - ZoneId shiftTimeZone) { - super( - genAggsHandler, - bufferFactory, - combinerFactory, - sliceAssigner, - accSerializer, - shiftTimeZone); + int indexOfCountStar) { + super(genAggsHandler, bufferFactory, combinerFactory, sliceAssigner, accSerializer); this.sliceSharedAssigner = sliceAssigner; this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner); } @@ -85,13 +76,9 @@ public void fireWindow(Long windowEnd) throws Exception { if (nextWindowEndOptional.isPresent()) { long nextWindowEnd = nextWindowEndOptional.get(); if (sliceSharedAssigner.isEventTime()) { - timerService.registerEventTimeTimer( - nextWindowEnd, - TimeWindowUtil.toEpochMillsForTimer(nextWindowEnd - 1, shiftTimeZone)); + timerService.registerEventTimeWindowTimer(nextWindowEnd, nextWindowEnd - 1); } else { - timerService.registerProcessingTimeTimer( - nextWindowEnd, - TimeWindowUtil.toEpochMillsForTimer(nextWindowEnd - 1, shiftTimeZone)); + timerService.registerProcessingTimeWindowTimer(nextWindowEnd, nextWindowEnd - 1); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java index 32d0f43b585f44..69bc2e631daae4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java @@ -25,8 +25,6 @@ import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SliceUnsharedAssigner; -import java.time.ZoneId; - /** * An window aggregate processor implementation which works for {@link SliceUnsharedAssigner}, e.g. * tumbling windows. @@ -39,15 +37,8 @@ public SliceUnsharedWindowAggProcessor( WindowBuffer.Factory windowBufferFactory, WindowCombineFunction.Factory combineFactory, SliceUnsharedAssigner sliceAssigner, - TypeSerializer accSerializer, - ZoneId shiftTimeZone) { - super( - genAggsHandler, - windowBufferFactory, - combineFactory, - sliceAssigner, - accSerializer, - shiftTimeZone); + TypeSerializer accSerializer) { + super(genAggsHandler, windowBufferFactory, combineFactory, sliceAssigner, accSerializer); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index da89489eac50e0..1374852d5f77d6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -155,8 +155,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { rankStart, rankEnd, outputRankNumber, - windowEndIndex, - shiftTimeZone); - return new SlicingWindowOperator<>(windowProcessor); + windowEndIndex); + return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java index a5442d3c7b4450..79884dad15df18 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java @@ -22,18 +22,17 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.rank.TopNBuffer; import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowMapState; import org.apache.flink.table.runtime.operators.window.state.WindowState; import org.apache.flink.table.runtime.util.WindowKey; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -43,7 +42,6 @@ import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** * An implementation of {@link WindowCombineFunction} that save topN records of incremental input @@ -52,7 +50,7 @@ public final class TopNRecordsCombiner implements WindowCombineFunction { /** The service to register event-time or processing-time timers. */ - private final InternalTimerService timerService; + private final WindowTimerService timerService; /** Context to switch current key for states. */ private final StateKeyContext keyContext; @@ -81,11 +79,8 @@ public final class TopNRecordsCombiner implements WindowCombineFunction { /** Whether the operator works in event-time mode, used to indicate registering which timer. */ private final boolean isEventTime; - /** The shifted timezone of the window. */ - private final ZoneId shiftTimeZone; - public TopNRecordsCombiner( - InternalTimerService timerService, + WindowTimerService timerService, StateKeyContext keyContext, WindowMapState> dataState, Comparator sortKeyComparator, @@ -94,8 +89,7 @@ public TopNRecordsCombiner( boolean requiresCopyKey, TypeSerializer keySerializer, TypeSerializer recordSerializer, - boolean isEventTime, - ZoneId shiftTimeZone) { + boolean isEventTime) { this.timerService = timerService; this.keyContext = keyContext; this.dataState = dataState; @@ -106,7 +100,6 @@ public TopNRecordsCombiner( this.keySerializer = keySerializer; this.recordSerializer = recordSerializer; this.isEventTime = isEventTime; - this.shiftTimeZone = shiftTimeZone; } @Override @@ -151,8 +144,7 @@ public void combine(WindowKey windowKey, Iterator records) throws Excep } // step 3: register timer for current window if (isEventTime) { - timerService.registerEventTimeTimer( - window, toEpochMillsForTimer(window - 1, shiftTimeZone)); + timerService.registerEventTimeWindowTimer(window, window - 1); } // we don't need register processing-time timer, because we already register them // per-record in AbstractWindowAggProcessor.processElement() @@ -193,11 +185,10 @@ public Factory( @Override public WindowCombineFunction create( RuntimeContext runtimeContext, - InternalTimerService timerService, + WindowTimerService timerService, KeyedStateBackend stateBackend, WindowState windowState, - boolean isEventTime, - ZoneId shiftTimeZone) + boolean isEventTime) throws Exception { final Comparator sortKeyComparator = generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader()); @@ -214,8 +205,7 @@ public WindowCombineFunction create( requiresCopyKey, keySerializer, recordSerializer, - isEventTime, - shiftTimeZone); + isEventTime); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java index 8521883f405002..790b6fcfc541e8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java @@ -32,10 +32,10 @@ import org.apache.flink.table.runtime.operators.rank.TopNBuffer; import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.WindowMapState; import org.apache.flink.types.RowKind; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -43,6 +43,8 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; + /** An window rank processor. */ public final class WindowRankProcessor implements SlicingWindowProcessor { private static final long serialVersionUID = 1L; @@ -61,8 +63,6 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private final long rankEnd; private final boolean outputRankNumber; private final int windowEndIndex; - /** The shifted timezone of the window. */ - private final ZoneId shiftTimeZone; // ---------------------------------------------------------------------------------------- @@ -70,6 +70,8 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private transient Context ctx; + private transient WindowTimerService timerService; + private transient WindowBuffer windowBuffer; /** state schema: [key, window_end, sort key, records]. */ @@ -87,8 +89,7 @@ public WindowRankProcessor( long rankStart, long rankEnd, boolean outputRankNumber, - int windowEndIndex, - ZoneId shiftTimeZone) { + int windowEndIndex) { this.inputSerializer = inputSerializer; this.generatedSortKeyComparator = genSortKeyComparator; this.sortKeySerializer = sortKeySerializer; @@ -98,7 +99,6 @@ public WindowRankProcessor( this.rankEnd = rankEnd; this.outputRankNumber = outputRankNumber; this.windowEndIndex = windowEndIndex; - this.shiftTimeZone = shiftTimeZone; } @Override @@ -117,6 +117,8 @@ public void open(Context context) throws Exception { MapState> state = ctx.getKeyedStateBackend() .getOrCreateKeyedState(namespaceSerializer, mapStateDescriptor); + + this.timerService = ctx.getTimerService(); this.windowState = new WindowMapState<>( (InternalMapState>) state); @@ -126,8 +128,7 @@ public void open(Context context) throws Exception { ctx.getTimerService(), ctx.getKeyedStateBackend(), windowState, - true, - shiftTimeZone); + true); this.windowBuffer = bufferFactory.create( ctx.getOperatorOwner(), @@ -153,8 +154,9 @@ public boolean processElement(RowData key, RowData element) throws Exception { @Override public void advanceProgress(long progress) throws Exception { - if (progress > currentProgress) { - currentProgress = progress; + long timestamp = toUtcTimestampMills(progress, timerService.getShiftTimeZone()); + if (timestamp > currentProgress) { + currentProgress = timestamp; windowBuffer.advanceProgress(currentProgress); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java index a5bab409bc2c5a..77344b791de26e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java @@ -62,6 +62,7 @@ import java.util.Collection; import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -432,7 +433,7 @@ public void onProcessingTime(InternalTimer timer) throws Exception { * @param window the window whose state to discard */ private void registerCleanupTimer(W window) { - long cleanupTime = cleanupTime(window); + long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone); if (cleanupTime == Long.MAX_VALUE) { // don't set a GC timer for "end of time" return; @@ -533,7 +534,7 @@ public void clearTrigger(W window) throws Exception { @Override public void deleteCleanupTimer(W window) throws Exception { - long cleanupTime = cleanupTime(window); + long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone); if (cleanupTime == Long.MAX_VALUE) { // no need to clean up because we didn't set one return; @@ -572,13 +573,11 @@ boolean onElement(RowData row, long timestamp) throws Exception { } boolean onProcessingTime(long time) throws Exception { - return trigger.onProcessingTime( - TimeWindowUtil.toUtcTimestampMills(time, shiftTimeZone), window); + return trigger.onProcessingTime(time, window); } boolean onEventTime(long time) throws Exception { - return trigger.onEventTime( - TimeWindowUtil.toUtcTimestampMills(time, shiftTimeZone), window); + return trigger.onEventTime(time, window); } void onMerge() throws Exception { @@ -602,26 +601,22 @@ public MetricGroup getMetricGroup() { @Override public void registerProcessingTimeTimer(long time) { - internalTimerService.registerProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.registerProcessingTimeTimer(window, time); } @Override public void registerEventTimeTimer(long time) { - internalTimerService.registerEventTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.registerEventTimeTimer(window, time); } @Override public void deleteProcessingTimeTimer(long time) { - internalTimerService.deleteProcessingTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.deleteProcessingTimeTimer(window, time); } @Override public void deleteEventTimeTimer(long time) { - internalTimerService.deleteEventTimeTimer( - window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone)); + internalTimerService.deleteEventTimeTimer(window, time); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java index bd569a75b1bad9..c12a2476f9e0b1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java @@ -21,14 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.WindowState; import org.apache.flink.table.runtime.util.WindowKey; import org.apache.flink.util.Collector; import java.io.Serializable; -import java.time.ZoneId; import java.util.Iterator; /** The {@link WindowCombineFunction} is used to combine buffered data into state. */ @@ -65,15 +64,13 @@ interface Factory extends Serializable { * @param windowState the window state to flush buffered data into. * @param isEventTime indicates whether the operator works in event-time or processing-time * mode, used for register corresponding timers. - * @param shiftTimeZone indicates the timer of window should shift or not. */ WindowCombineFunction create( RuntimeContext runtimeContext, - InternalTimerService timerService, + WindowTimerService timerService, KeyedStateBackend stateBackend, WindowState windowState, - boolean isEventTime, - ZoneId shiftTimeZone) + boolean isEventTime) throws Exception; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java index e5cfc8fdb7e735..084e4a55e84003 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/internal/InternalWindowProcessFunction.java @@ -117,7 +117,9 @@ protected final boolean isCleanupTime(W window, long time) { * the given window. */ protected boolean isWindowLate(W window) { - return (windowAssigner.isEventTime() && (cleanupTime(window) <= ctx.currentWatermark())); + return (windowAssigner.isEventTime() + && (toEpochMillsForTimer(cleanupTime(window), ctx.getShiftTimeZone()) + <= ctx.currentWatermark())); } /** @@ -128,12 +130,11 @@ protected boolean isWindowLate(W window) { * @param window the window whose cleanup time we are computing. */ private long cleanupTime(W window) { - long windowMaxTs = toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); if (windowAssigner.isEventTime()) { - long cleanupTime = windowMaxTs + allowedLateness; - return cleanupTime >= windowMaxTs ? cleanupTime : Long.MAX_VALUE; + long cleanupTime = window.maxTimestamp() + allowedLateness; + return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { - return windowMaxTs; + return window.maxTimestamp(); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java index 0b8ce56145c4bd..40b99fb617d6ff 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.window.TimeWindow; -import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.util.IterableIterator; import org.apache.flink.util.MathUtils; @@ -36,6 +35,7 @@ import java.util.Optional; import java.util.function.Supplier; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; import static org.apache.flink.util.Preconditions.checkArgument; /** Utilities to create {@link SliceAssigner}s. */ @@ -515,14 +515,10 @@ protected AbstractSliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone) { public final long assignSliceEnd(RowData element, ClockService clock) { final long timestamp; if (rowtimeIndex >= 0) { - timestamp = - TimeWindowUtil.toUtcTimestampMills( - element.getLong(rowtimeIndex), shiftTimeZone); + timestamp = toUtcTimestampMills(element.getLong(rowtimeIndex), shiftTimeZone); } else { // in processing time mode - timestamp = - TimeWindowUtil.toUtcTimestampMills( - clock.currentProcessingTime(), shiftTimeZone); + timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone); } return assignSliceEnd(timestamp); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java index 516f09b199c555..ada96f7a8a74be 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -41,6 +40,8 @@ import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor; +import java.time.ZoneId; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -105,6 +106,9 @@ public final class SlicingWindowOperator extends TableStreamOperator windowProcessor; + /** The shift timezone of the window. */ + private final ZoneId shiftTimeZone; + // ------------------------------------------------------------------------ /** This is used for emitting elements with a given timestamp. */ @@ -114,7 +118,7 @@ public final class SlicingWindowOperator extends TableStreamOperator internalTimerService; + private transient WindowTimerService windowTimerService; /** The tracked processing time triggered last time. */ private transient long lastTriggeredProcessingTime; @@ -127,8 +131,9 @@ public final class SlicingWindowOperator extends TableStreamOperator watermarkLatency; - public SlicingWindowOperator(SlicingWindowProcessor windowProcessor) { + public SlicingWindowOperator(SlicingWindowProcessor windowProcessor, ZoneId shiftTimeZone) { this.windowProcessor = windowProcessor; + this.shiftTimeZone = shiftTimeZone; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -141,16 +146,18 @@ public void open() throws Exception { collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); - internalTimerService = - getInternalTimerService( - "window-timers", windowProcessor.createWindowSerializer(), this); + windowTimerService = + new WindowTimerServiceImpl<>( + getInternalTimerService( + "window-timers", windowProcessor.createWindowSerializer(), this), + shiftTimeZone); windowProcessor.open( new WindowProcessorContext<>( getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), - internalTimerService, + windowTimerService, getKeyedStateBackend(), collector, getRuntimeContext())); @@ -165,11 +172,11 @@ public void open() throws Exception { metrics.gauge( WATERMARK_LATENCY_METRIC_NAME, () -> { - long watermark = internalTimerService.currentWatermark(); + long watermark = windowTimerService.currentWatermark(); if (watermark < 0) { return 0L; } else { - return internalTimerService.currentProcessingTime() - watermark; + return windowTimerService.currentProcessingTime() - watermark; } }); } @@ -248,7 +255,7 @@ private static final class WindowProcessorContext private final Object operatorOwner; private final MemoryManager memoryManager; private final long memorySize; - private final InternalTimerService timerService; + private final WindowTimerService timerService; private final KeyedStateBackend keyedStateBackend; private final Output collector; private final RuntimeContext runtimeContext; @@ -257,7 +264,7 @@ private WindowProcessorContext( Object operatorOwner, MemoryManager memoryManager, long memorySize, - InternalTimerService timerService, + WindowTimerService timerService, KeyedStateBackend keyedStateBackend, Output collector, RuntimeContext runtimeContext) { @@ -291,7 +298,7 @@ public KeyedStateBackend getKeyedStateBackend() { } @Override - public InternalTimerService getTimerService() { + public WindowTimerService getTimerService() { return timerService; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java index 4b034842f1ac9d..94b4d0c95d8ca1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowProcessor.java @@ -107,7 +107,7 @@ interface Context { KeyedStateBackend getKeyedStateBackend(); /** Returns the current {@link InternalTimerService}. */ - InternalTimerService getTimerService(); + WindowTimerService getTimerService(); /** Returns the current {@link RuntimeContext}. */ RuntimeContext getRuntimeContext(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java new file mode 100644 index 00000000000000..b5aa1c9e6ade64 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.slicing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +import java.time.ZoneId; + +/** + * Interface for working with window time and timers which considers timezone for window splitting. + * + * @param Type of the window namespace to which timers are scoped. + */ +@Internal +public interface WindowTimerService { + + /** + * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift + * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC + * which means never shift when assigning windows. + */ + ZoneId getShiftTimeZone(); + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + long currentWatermark(); + + /** Returns the current {@link InternalTimerService}. */ + InternalTimerService getInternalTimerService(); + + /** + * Registers a window timer to be fired when processing time passes the window. The window you + * pass here will be provided when the timer fires. + */ + void registerProcessingTimeWindowTimer(W window, long windowEnd); + + /** Deletes the timer for the given key and window. */ + void deleteProcessingTimeWindowTimer(W window, long windowEnd); + + /** + * Registers a window timer to be fired when event time watermark passes the window. The window + * you pass here will be provided when the timer fires. + */ + void registerEventTimeWindowTimer(W window, long windowEnd); + + /** Deletes the timer for the given key and window. */ + void deleteEventTimeWindowTimer(W window, long windowEnd); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java new file mode 100644 index 00000000000000..72cf4b81223298 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerServiceImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.slicing; + +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.table.runtime.util.TimeWindowUtil; + +import java.time.ZoneId; + +/** Simple Implements of {@link WindowTimerService}. */ +public class WindowTimerServiceImpl implements WindowTimerService { + + private final InternalTimerService internalTimerService; + private final ZoneId shiftTimeZone; + + public WindowTimerServiceImpl( + InternalTimerService internalTimerService, ZoneId shiftTimeZone) { + this.internalTimerService = internalTimerService; + this.shiftTimeZone = shiftTimeZone; + } + + @Override + public ZoneId getShiftTimeZone() { + return shiftTimeZone; + } + + @Override + public long currentProcessingTime() { + return internalTimerService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return internalTimerService.currentWatermark(); + } + + @Override + public InternalTimerService getInternalTimerService() { + return internalTimerService; + } + + @Override + public void registerProcessingTimeWindowTimer(W window, long windowEnd) { + internalTimerService.registerProcessingTimeTimer( + window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + } + + @Override + public void deleteProcessingTimeWindowTimer(W window, long windowEnd) { + internalTimerService.deleteProcessingTimeTimer( + window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + } + + @Override + public void registerEventTimeWindowTimer(W window, long windowEnd) { + internalTimerService.registerEventTimeTimer( + window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + } + + @Override + public void deleteEventTimeWindowTimer(W window, long windowEnd) { + internalTimerService.deleteEventTimeTimer( + window, TimeWindowUtil.toEpochMillsForTimer(windowEnd, shiftTimeZone)); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java index f44139c3b83b63..103710afd75c6b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java @@ -100,7 +100,8 @@ public boolean onElement(Object element, long timestamp, W window) throws Except // if the watermark is already past the window fire immediately return true; } else { - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); return false; } } @@ -112,12 +113,13 @@ public boolean onProcessingTime(long time, W window) throws Exception { @Override public boolean onEventTime(long time, W window) throws Exception { - return time == window.maxTimestamp(); + return time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); } @Override public void clear(W window) throws Exception { - ctx.deleteEventTimeTimer(window.maxTimestamp()); + ctx.deleteEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); } @Override @@ -127,7 +129,8 @@ public boolean canMerge() { @Override public void onMerge(W window, OnMergeContext mergeContext) throws Exception { - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); } @Override @@ -186,7 +189,8 @@ public boolean onElement(Object element, long timestamp, W window) throws Except return true; } else { // we are in the early phase - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); return earlyTrigger != null && earlyTrigger.onElement(element, timestamp, window); } @@ -213,7 +217,7 @@ public boolean onEventTime(long time, W window) throws Exception { // late fire return lateTrigger != null && lateTrigger.onEventTime(time, window); } else { - if (time == window.maxTimestamp()) { + if (time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())) { // fire on time and update state hasFiredState.update(true); return true; @@ -242,7 +246,8 @@ public void onMerge(W window, OnMergeContext mergeContext) throws Exception { // we assume that the new merged window has not fired yet its on-time timer. ctx.getPartitionedState(hasFiredOnTimeStateDesc).update(false); - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); } @Override @@ -253,7 +258,8 @@ public void clear(W window) throws Exception { if (lateTrigger != null) { lateTrigger.clear(window); } - ctx.deleteEventTimeTimer(window.maxTimestamp()); + ctx.deleteEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); ctx.getPartitionedState(hasFiredOnTimeStateDesc).clear(); } @@ -312,7 +318,8 @@ public boolean onElement(Object element, long timestamp, W window) throws Except return true; } else { // this is an early element so register the timer and let the early trigger decide - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); return earlyTrigger.onElement(element, timestamp, window); } } @@ -324,7 +331,8 @@ public boolean onProcessingTime(long time, W window) throws Exception { @Override public boolean onEventTime(long time, W window) throws Exception { - return time == window.maxTimestamp() || earlyTrigger.onEventTime(time, window); + return time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()) + || earlyTrigger.onEventTime(time, window); } @Override @@ -334,13 +342,15 @@ public boolean canMerge() { @Override public void onMerge(W window, OnMergeContext mergeContext) throws Exception { - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); earlyTrigger.onMerge(window, mergeContext); } @Override public void clear(W window) throws Exception { - ctx.deleteEventTimeTimer(window.maxTimestamp()); + ctx.deleteEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); earlyTrigger.clear(window); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java index b09d46307ab0e3..29ce8b21381623 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java @@ -26,7 +26,7 @@ import java.time.Duration; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -100,11 +100,9 @@ public void open(TriggerContext ctx) throws Exception { public boolean onElement(Object element, long timestamp, W window) throws Exception { ReducingState nextFiring = ctx.getPartitionedState(nextFiringStateDesc); if (nextFiring.get() == null) { - long nextShiftedTimer = - toUtcTimestampMills(ctx.getCurrentProcessingTime(), ctx.getShiftTimeZone()) - + interval; - ctx.registerProcessingTimeTimer(nextShiftedTimer); - nextFiring.add(nextShiftedTimer); + long nextTimer = ctx.getCurrentProcessingTime() + interval; + ctx.registerProcessingTimeTimer(nextTimer); + nextFiring.add(nextTimer); } return false; } @@ -195,13 +193,14 @@ public void open(TriggerContext ctx) throws Exception { @Override public boolean onElement(Object element, long timestamp, W window) throws Exception { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); return false; } @Override public boolean onProcessingTime(long time, W window) throws Exception { - return time == window.maxTimestamp(); + return time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); } @Override @@ -211,7 +210,8 @@ public boolean onEventTime(long time, W window) throws Exception { @Override public void clear(W window) throws Exception { - ctx.deleteProcessingTimeTimer(window.maxTimestamp()); + ctx.deleteProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); } @Override @@ -221,7 +221,8 @@ public boolean canMerge() { @Override public void onMerge(W window, OnMergeContext mergeContext) throws Exception { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); } @Override @@ -252,13 +253,15 @@ public void open(TriggerContext ctx) throws Exception { @Override public boolean onElement(Object element, long timestamp, W window) throws Exception { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); return earlyTrigger.onElement(element, timestamp, window); } @Override public boolean onProcessingTime(long time, W window) throws Exception { - return time == window.maxTimestamp() || earlyTrigger.onProcessingTime(time, window); + return time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()) + || earlyTrigger.onProcessingTime(time, window); } @Override @@ -273,13 +276,15 @@ public boolean canMerge() { @Override public void onMerge(W window, OnMergeContext mergeContext) throws Exception { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); earlyTrigger.onMerge(window, mergeContext); } @Override public void clear(W window) throws Exception { - ctx.deleteProcessingTimeTimer(window.maxTimestamp()); + ctx.deleteProcessingTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); earlyTrigger.clear(window); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java index 7e8d46c96c8035..c3523e7c552bb2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java @@ -77,18 +77,18 @@ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTime * return the larger epoch mills if the time is leaving the DST. * eg. Los_Angeles has two timestamp 2021-11-07 01:00:00 when leaving DST. *
-             *  2021-11-07 00:00:00 -> epoch0  = 1636268400000L;  2021-11-07 00:00:00
-             *  2021-11-07 01:00:00 -> epoch1  = 1636272000000L;  the first local timestamp 2021-11-07 01:00:00
-             *  2021-11-07 01:00:00 -> epoch2  = 1636275600000L;  back to local timestamp  2021-11-07 01:00:00
-             *  2021-11-07 02:00:00 -> epoch3  = 1636279200000L;  2021-11-07 02:00:00
+             *  2021-11-07 00:00:00 -> epoch0 = 1636268400000L;  2021-11-07 00:00:00
+             *  2021-11-07 01:00:00 -> epoch1 = 1636272000000L;  the first local timestamp 2021-11-07 01:00:00
+             *  2021-11-07 01:00:00 -> epoch2 = 1636275600000L;  back to local timestamp  2021-11-07 01:00:00
+             *  2021-11-07 02:00:00 -> epoch3 = 1636279200000L;  2021-11-07 02:00:00
              *
              * we should use the epoch1 + 1 hour to register timer to ensure the two hours' data can
              * be fired properly.
              *
              * 
-             *  2021-11-07 00:00:00 => long epoch0 = 1636268400000L;
-             *  2021-11-07 01:00:00 => long epoch1 = 1636272000000L; register 1636275600000L(epoch2)
-             *  2021-11-07 02:00:00 => long epoch3 = 1636279200000L;
+             *  2021-11-07 00:00:00 -> epoch0 = 1636268400000L;
+             *  2021-11-07 01:00:00 -> epoch1 = 1636272000000L; register 1636275600000L(epoch2)
+             *  2021-11-07 02:00:00 -> epoch3 = 1636279200000L;
              */
             LocalDateTime utcTimestamp =
                     LocalDateTime.ofInstant(Instant.ofEpochMilli(utcTimestampMills), UTC_ZONE_ID);
@@ -120,7 +120,8 @@ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTime
      * @return the epoch mills.
      */
     public static long toEpochMills(long utcTimestampMills, ZoneId shiftTimeZone) {
-        if (UTC_ZONE_ID.equals(shiftTimeZone)) {
+        // Long.MAX_VALUE is a flag of max watermark, directly return it
+        if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == utcTimestampMills) {
             return utcTimestampMills;
         }
         LocalDateTime utcTimestamp =
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java
new file mode 100644
index 00000000000000..77e87ffa4deacb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java
@@ -0,0 +1,74 @@
+package org.apache.flink.table.runtime.util;
+
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link org.apache.flink.table.runtime.util.TimeWindowUtil}. */
+public class TimeWindowUtilTest {
+
+    private static final ZoneId UTC_ZONE_ID = TimeZone.getTimeZone("UTC").toZoneId();
+
+    @Test
+    public void testShiftedTimeZone() {
+        ZoneId zoneId = ZoneId.of("Asia/Shanghai");
+        assertEquals(-28799000L, toEpochMillsForTimer(utcMills("1970-01-01T00:00:01"), zoneId));
+        assertEquals(-1L, toEpochMillsForTimer(utcMills("1970-01-01T07:59:59.999"), zoneId));
+        assertEquals(1000L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:01"), zoneId));
+        assertEquals(1L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:00.001"), zoneId));
+    }
+
+    @Test
+    public void testDaylightSaving() {
+        ZoneId zoneId = ZoneId.of("America/Los_Angeles");
+        /*
+         * The DaylightTime in Los_Angele start at time 2021-03-14 02:00:00
+         * 
+         *  2021-03-14 00:00:00 -> epoch1 = 1615708800000L;
+         *  2021-03-14 01:00:00 -> epoch2 = 1615712400000L;
+         *  2021-03-14 03:00:00 -> epoch3 = 1615716000000L;  skip one hour (2021-03-14 02:00:00)
+         *  2021-03-14 04:00:00 -> epoch4 = 1615719600000L;
+         */
+        assertEquals(1615708800000L, toEpochMillsForTimer(utcMills("2021-03-14T00:00:00"), zoneId));
+        assertEquals(1615712400000L, toEpochMillsForTimer(utcMills("2021-03-14T01:00:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:00:00"), zoneId));
+        assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T03:00:00"), zoneId));
+        assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T02:30:00"), zoneId));
+        assertEquals(1615719599000L, toEpochMillsForTimer(utcMills("2021-03-14T02:59:59"), zoneId));
+        assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T03:30:00"), zoneId));
+
+        /*
+         * The DaylightTime in Los_Angele start at time 2021-11-07 01:00:00
+         * 
+         *  2021-11-07 00:00:00 -> epoch0 = 1636268400000L;  2021-11-07 00:00:00
+         *  2021-11-07 01:00:00 -> epoch1 = 1636272000000L;  the first local timestamp 2021-11-07 01:00:00
+         *  2021-11-07 01:00:00 -> epoch2 = 1636275600000L;  back to local timestamp  2021-11-07 01:00:00
+         *  2021-11-07 02:00:00 -> epoch3 = 1636279200000L;  2021-11-07 02:00:00
+         */
+        assertEquals(1636268400000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:00"), zoneId));
+        assertEquals(1636275600000L, toEpochMillsForTimer(utcMills("2021-11-07T01:00:00"), zoneId));
+        assertEquals(1636279200000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:00"), zoneId));
+        assertEquals(1636268401000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:01"), zoneId));
+        assertEquals(1636279199000L, toEpochMillsForTimer(utcMills("2021-11-07T01:59:59"), zoneId));
+        assertEquals(1636279201000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:01"), zoneId));
+    }
+
+    @Test
+    public void testMaxWaterMark() {
+        ZoneId zoneId = ZoneId.of("Asia/Shanghai");
+        assertEquals(Long.MAX_VALUE, toUtcTimestampMills(Long.MAX_VALUE, zoneId));
+        assertEquals(Long.MAX_VALUE, toEpochMillsForTimer(Long.MAX_VALUE, zoneId));
+        assertEquals(Long.MAX_VALUE, toEpochMills(Long.MAX_VALUE, zoneId));
+    }
+
+    private static long utcMills(String utcDateTime) {
+        return LocalDateTime.parse(utcDateTime).atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+}