Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Apr 9, 2021
1 parent 1b0d278 commit 2803f76
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;

import java.time.ZoneId;
import java.util.TimeZone;

import static org.apache.flink.table.runtime.operators.window.TimeWindow.getWindowStartWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
Expand Down Expand Up @@ -66,6 +67,9 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
*/
private final ZoneId shiftTimezone;

/** The shift timezone is using DayLightSaving time or not. */
private final boolean useDayLightSaving;

// ------------------------------------------------------------------------

/** This is used for emitting elements with a given timestamp. */
Expand Down Expand Up @@ -110,6 +114,7 @@ public LocalSlicingWindowAggOperator(
this.windowBufferFactory = windowBufferFactory;
this.combinerFactory = combinerFactory;
this.shiftTimezone = shiftTimezone;
this.useDayLightSaving = TimeZone.getTimeZone(shiftTimezone).useDaylightTime();
}

@Override
Expand Down Expand Up @@ -148,7 +153,8 @@ public void processWatermark(Watermark mark) throws Exception {
// we only need to call advanceProgress() when current watermark may trigger window
windowBuffer.advanceProgress(currentWatermark);
nextTriggerWatermark =
getNextTriggerWatermark(currentWatermark, windowInterval, shiftTimezone);
getNextTriggerWatermark(
currentWatermark, windowInterval, shiftTimezone, useDayLightSaving);
}
}
super.processWatermark(mark);
Expand Down Expand Up @@ -195,11 +201,22 @@ private long computeMemorySize() {
// ------------------------------------------------------------------------
/** Method to get the next watermark to trigger window. */
private static long getNextTriggerWatermark(
long currentWatermark, long interval, ZoneId shiftTimezone) {
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark, shiftTimezone), 0L, interval);
long triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone);
long currentWatermark, long interval, ZoneId shiftTimezone, boolean useDayLightSaving) {
if (currentWatermark == Long.MAX_VALUE) {
return currentWatermark;
}

long triggerWatermark;
// consider the DST timezone
if (useDayLightSaving) {
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark, shiftTimezone), 0L, interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone);
} else {
long start = getWindowStartWithOffset(currentWatermark, 0L, interval);
triggerWatermark = start + interval - 1;
}

if (triggerWatermark > currentWatermark) {
return triggerWatermark;
Expand Down

0 comments on commit 2803f76

Please sign in to comment.