Skip to content

Commit

Permalink
Introduce WindowTimerService and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Apr 7, 2021
1 parent d00e2d8 commit 2b01969
Show file tree
Hide file tree
Showing 26 changed files with 405 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/** The Python Group Window AggregateFunction operator for the blink planner. */
Expand Down Expand Up @@ -245,16 +246,16 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {

if (timerOperandType == REGISTER_EVENT_TIMER) {
internalTimerService.registerEventTimeTimer(
window, toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == REGISTER_PROCESSING_TIMER) {
internalTimerService.registerProcessingTimeTimer(
window, toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_EVENT_TIMER) {
internalTimerService.deleteEventTimeTimer(
window, toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_PROCESSING_TIMER) {
internalTimerService.deleteProcessingTimeTimer(
window, toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else {
throw new RuntimeException(
String.format("Unsupported timerOperandType %s.", timerOperandType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -64,6 +63,7 @@
import java.util.LinkedList;
import java.util.List;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

Expand Down Expand Up @@ -311,7 +311,8 @@ private void buildWindow(PlannerNamedWindowProperty[] namedProperties) {
*/
private boolean isWindowLate(W window) {
return windowAssigner.isEventTime()
&& (cleanupTime(window) <= internalTimerService.currentWatermark());
&& (toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)
<= internalTimerService.currentWatermark());
}

/**
Expand All @@ -322,12 +323,11 @@ private boolean isWindowLate(W window) {
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
long windowMatTs = toEpochMillsForTimer(window.maxTimestamp(), shiftTimeZone);
if (windowAssigner.isEventTime()) {
long cleanupTime = windowMatTs + allowedLateness;
return cleanupTime >= windowMatTs ? cleanupTime : Long.MAX_VALUE;
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return windowMatTs;
return window.maxTimestamp();
}
}

Expand Down Expand Up @@ -386,7 +386,7 @@ private boolean hasRetractData(
* @param window the window whose state to discard
*/
private void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
Expand Down Expand Up @@ -425,11 +425,11 @@ private void setWindowProperty(W currentWindow) {
}

private long getShiftEpochMills(long utcTimestampMills) {
return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone);
return toEpochMills(utcTimestampMills, shiftTimeZone);
}

private void cleanWindowIfNeeded(W window, long currentTime) throws Exception {
if (currentTime == cleanupTime(window)) {
if (currentTime == toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)) {
windowAccumulateData.setCurrentNamespace(window);
windowAccumulateData.clear();
windowRetractData.setCurrentNamespace(window);
Expand Down Expand Up @@ -457,7 +457,7 @@ boolean onElement(RowData row, long timestamp) throws Exception {
}

boolean onProcessingTime(long time) throws Exception {
return trigger.onProcessingTime(toUtcTimestampMills(time, shiftTimeZone), window);
return trigger.onProcessingTime(time, window);
}

boolean onEventTime(long time) throws Exception {
Expand Down Expand Up @@ -485,8 +485,7 @@ public MetricGroup getMetricGroup() {

@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.registerProcessingTimeTimer(window, time);
}

@Override
Expand All @@ -496,8 +495,7 @@ public void registerEventTimeTimer(long time) {

@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.deleteProcessingTimeTimer(window, time);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import org.apache.flink.table.data.{RowData, TimestampData}
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.runtime.utils.TestData
import org.apache.flink.table.runtime.util.{RowDataHarnessAssertor, TimeWindowUtil}
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
import org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills

import org.apache.flink.types.Row
import org.apache.flink.types.RowKind.INSERT

Expand Down Expand Up @@ -310,7 +312,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
private def localMills(dateTime: String): TimestampData = {
val windowDateTime = LocalDateTime.parse(dateTime).atZone(UTC_ZONE_ID)
TimestampData.fromEpochMillis(
TimeWindowUtil.toUtcTimestampMills(windowDateTime.toInstant.toEpochMilli, shiftTimeZone))
toUtcTimestampMills(windowDateTime.toInstant.toEpochMilli, shiftTimeZone))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class WindowAggregateUseDaylightTimeHarnessTest(backend: StateBackendMode, timeZ
expected.add(record("a", 2L, 5.0D, 1L, ts("2021-03-14T03:00:00"), ts("2021-03-14T05:00:00")))
expected.add(record("a", 3L, 5.0D, 1L, ts("2021-03-14T03:00:00"), ts("2021-03-14T06:00:00")))

// window [2021-11-07T00:00:00, 2021-11-07T02:00:00] windows contains 3 hours data
// [2021-11-07T00:00:00, 2021-11-07T02:00:00] window contains 3 hours data
expected.add(record("a", 1L, 3.0D, 1L, ts("2021-11-07T00:00:00"), ts("2021-11-07T01:00:00")))
expected.add(record("a", 3L, 3.0D, 2L, ts("2021-11-07T00:00:00"), ts("2021-11-07T02:00:00")))
expected.add(record("a", 4L, 3.0D, 2L, ts("2021-11-07T00:00:00"), ts("2021-11-07T03:00:00")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

import java.time.ZoneId;

import static org.apache.flink.table.runtime.operators.window.TimeWindow.getWindowStartWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/**
* The operator used for local window aggregation.
*
Expand Down Expand Up @@ -73,7 +74,7 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
private transient boolean functionsClosed = false;

/** current watermark of this operator, the value has considered shifted timezone. */
private transient long currentWatermark;
private transient long currentShiftWatermark;

/** The next watermark to trigger windows, the value has considered shifted timezone. */
private transient long nextTriggerWatermark;
Expand Down Expand Up @@ -139,13 +140,14 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

@Override
public void processWatermark(Watermark mark) throws Exception {
long timestamp = TimeWindowUtil.toUtcTimestampMills(mark.getTimestamp(), shiftTimezone);
if (timestamp > currentWatermark) {
currentWatermark = timestamp;
if (currentWatermark >= nextTriggerWatermark) {
// we only need to call advanceProgress() when currentWatermark may trigger window
windowBuffer.advanceProgress(currentWatermark);
nextTriggerWatermark = getNextTriggerWatermark(currentWatermark, windowInterval);
long timestamp = toUtcTimestampMills(mark.getTimestamp(), shiftTimezone);
if (timestamp > currentShiftWatermark) {
currentShiftWatermark = timestamp;
if (currentShiftWatermark >= nextTriggerWatermark) {
// we only need to call advanceProgress() when current watermark may trigger window
windowBuffer.advanceProgress(currentShiftWatermark);
nextTriggerWatermark =
getNextTriggerWatermark(currentShiftWatermark, windowInterval);
}
}
super.processWatermark(mark);
Expand Down Expand Up @@ -190,11 +192,11 @@ private long computeMemorySize() {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/** Method to get the next watermark to trigger window. */
private static long getNextTriggerWatermark(long currentWatermark, long interval) {
long start = TimeWindow.getWindowStartWithOffset(currentWatermark, 0L, interval);
/** Method to get the next shifted watermark to trigger window. */
private static long getNextTriggerWatermark(long currentShiftWatermark, long interval) {
long start = getWindowStartWithOffset(currentShiftWatermark, 0L, interval);
long triggerWatermark = start + interval - 1;
if (triggerWatermark > currentWatermark) {
if (triggerWatermark > currentShiftWatermark) {
return triggerWatermark;
} else {
return triggerWatermark + interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,19 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
combinerFactory,
(SliceSharedAssigner) assigner,
accSerializer,
indexOfCountStart,
shiftTimeZone);
indexOfCountStart);
} else if (assigner instanceof SliceUnsharedAssigner) {
windowProcessor =
new SliceUnsharedWindowAggProcessor(
generatedAggregateFunction,
bufferFactory,
combinerFactory,
(SliceUnsharedAssigner) assigner,
accSerializer,
shiftTimeZone);
accSerializer);
} else {
throw new IllegalArgumentException(
"assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner.");
}
return new SlicingWindowOperator<>(windowProcessor);
return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,21 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.util.Preconditions;

import java.time.ZoneId;
import java.util.Iterator;

import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;

/**
* An implementation of {@link WindowCombineFunction} that accumulates input records into the window
Expand All @@ -48,7 +45,7 @@
public final class AggRecordsCombiner implements WindowCombineFunction {

/** The service to register event-time or processing-time timers. */
private final InternalTimerService<Long> timerService;
private final WindowTimerService<Long> timerService;

/** Context to switch current key for states. */
private final StateKeyContext keyContext;
Expand All @@ -71,19 +68,15 @@ public final class AggRecordsCombiner implements WindowCombineFunction {
/** Whether the operator works in event-time mode, used to indicate registering which timer. */
private final boolean isEventTime;

/** The shifted timezone of the window. */
private final ZoneId shiftTimezone;

public AggRecordsCombiner(
InternalTimerService<Long> timerService,
WindowTimerService<Long> timerService,
StateKeyContext keyContext,
WindowValueState<Long> accState,
NamespaceAggsHandleFunction<Long> aggregator,
boolean requiresCopy,
TypeSerializer<RowData> keySerializer,
TypeSerializer<RowData> recordSerializer,
boolean isEventTime,
ZoneId shiftTimezone) {
boolean isEventTime) {
this.timerService = timerService;
this.keyContext = keyContext;
this.accState = accState;
Expand All @@ -92,7 +85,6 @@ public AggRecordsCombiner(
this.keySerializer = keySerializer;
this.recordSerializer = recordSerializer;
this.isEventTime = isEventTime;
this.shiftTimezone = shiftTimezone;
}

@Override
Expand Down Expand Up @@ -137,8 +129,7 @@ record = recordSerializer.copy(record);

// step 5: register timer for current window
if (isEventTime) {
timerService.registerEventTimeTimer(
window, toEpochMillsForTimer(window - 1, shiftTimezone));
timerService.registerEventTimeWindowTimer(window, window - 1);
}
// we don't need register processing-time timer, because we already register them
// per-record in AbstractWindowAggProcessor.processElement()
Expand Down Expand Up @@ -174,13 +165,11 @@ public Factory(
@Override
public WindowCombineFunction create(
RuntimeContext runtimeContext,
InternalTimerService<Long> timerService,
WindowTimerService<Long> timerService,
KeyedStateBackend<RowData> stateBackend,
WindowState<Long> windowState,
boolean isEventTime,
ZoneId shiftTimeZone)
boolean isEventTime)
throws Exception {
Preconditions.checkNotNull(shiftTimeZone);
final NamespaceAggsHandleFunction<Long> aggregator =
genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
aggregator.open(
Expand All @@ -196,8 +185,7 @@ public WindowCombineFunction create(
requiresCopy,
keySerializer,
recordSerializer,
isEventTime,
shiftTimeZone);
isEventTime);
}
}
}
Loading

0 comments on commit 2b01969

Please sign in to comment.