Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20387][table] Support TIMESTAMP_LTZ as rowtime attribute #15485

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@

import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/** The Python Group Window AggregateFunction operator for the blink planner. */
@Internal
Expand Down Expand Up @@ -243,15 +245,17 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
setCurrentKey(((RowDataSerializer) getKeySerializer()).toBinaryRow(key));

if (timerOperandType == REGISTER_EVENT_TIMER) {
internalTimerService.registerEventTimeTimer(window, timestamp);
internalTimerService.registerEventTimeTimer(
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == REGISTER_PROCESSING_TIMER) {
internalTimerService.registerProcessingTimeTimer(
window, TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_EVENT_TIMER) {
internalTimerService.deleteEventTimeTimer(window, timestamp);
internalTimerService.deleteEventTimeTimer(
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_PROCESSING_TIMER) {
internalTimerService.deleteProcessingTimeTimer(
window, TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else {
throw new RuntimeException(
String.format("Unsupported timerOperandType %s.", timerOperandType));
Expand Down Expand Up @@ -407,8 +411,7 @@ private void emitTriggerTimerData(InternalTimer<K, W> timer, byte processingTime
reuseTimerData.setField(2, baos.toByteArray());
baos.reset();

reuseTimerRowData.setLong(
2, TimeWindowUtil.toUtcTimestampMills(timer.getTimestamp(), shiftTimeZone));
reuseTimerRowData.setLong(2, toUtcTimestampMills(timer.getTimestamp(), shiftTimeZone));

udfInputTypeSerializer.serialize(reuseTimerRowData, baosWrapper);
pythonFunctionRunner.process(baos.toByteArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -64,6 +63,10 @@
import java.util.LinkedList;
import java.util.List;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/** The Stream Arrow Python {@link AggregateFunction} Operator for Group Window Aggregation. */
@Internal
public class StreamArrowPythonGroupWindowAggregateFunctionOperator<K, W extends Window>
Expand Down Expand Up @@ -201,7 +204,7 @@ public void bufferInput(RowData input) throws Exception {
} else {
timestamp = internalTimerService.currentProcessingTime();
}
timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);
timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);

// Given the timestamp and element, returns the set of windows into which it
// should be placed.
Expand Down Expand Up @@ -308,7 +311,8 @@ private void buildWindow(PlannerNamedWindowProperty[] namedProperties) {
*/
private boolean isWindowLate(W window) {
return windowAssigner.isEventTime()
&& (cleanupTime(window) <= internalTimerService.currentWatermark());
&& (toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)
<= internalTimerService.currentWatermark());
}

/**
Expand Down Expand Up @@ -382,7 +386,7 @@ private boolean hasRetractData(
* @param window the window whose state to discard
*/
private void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
Expand Down Expand Up @@ -421,11 +425,11 @@ private void setWindowProperty(W currentWindow) {
}

private long getShiftEpochMills(long utcTimestampMills) {
return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone);
return toEpochMills(utcTimestampMills, shiftTimeZone);
}

private void cleanWindowIfNeeded(W window, long currentTime) throws Exception {
if (currentTime == cleanupTime(window)) {
if (currentTime == toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)) {
windowAccumulateData.setCurrentNamespace(window);
windowAccumulateData.clear();
windowRetractData.setCurrentNamespace(window);
Expand Down Expand Up @@ -453,8 +457,7 @@ boolean onElement(RowData row, long timestamp) throws Exception {
}

boolean onProcessingTime(long time) throws Exception {
return trigger.onProcessingTime(
TimeWindowUtil.toUtcTimestampMills(time, shiftTimeZone), window);
return trigger.onProcessingTime(time, window);
}

boolean onEventTime(long time) throws Exception {
Expand Down Expand Up @@ -482,8 +485,7 @@ public MetricGroup getMetricGroup() {

@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.registerProcessingTimeTimer(window, time);
}

@Override
Expand All @@ -493,15 +495,19 @@ public void registerEventTimeTimer(long time) {

@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.deleteProcessingTimeTimer(window, time);
}

@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(window, time);
}

@Override
public ZoneId getShiftTimeZone() {
return shiftTimeZone;
}

@Override
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
try {
Expand Down Expand Up @@ -536,6 +542,11 @@ public long currentWatermark() {
throw new RuntimeException("The method currentWatermark should not be called.");
}

@Override
public ZoneId getShiftTimeZone() {
return shiftTimeZone;
}

@Override
public RowData getWindowAccumulators(W window) {
throw new RuntimeException("The method getWindowAccumulators should not be called.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;

/** PassThroughPythonStreamGroupWindowAggregateOperator. */
public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
extends PythonStreamGroupWindowAggregateOperator<K, TimeWindow> {
Expand Down Expand Up @@ -296,11 +298,12 @@ private boolean isWindowLate(TimeWindow window) {
}

private long cleanupTime(TimeWindow window) {
long windowMaxTs = toEpochMillsForTimer(window.maxTimestamp(), shiftTimeZone);
if (windowAssigner.isEventTime()) {
long cleanupTime = Math.max(0, window.maxTimestamp() + allowedLateness);
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
long cleanupTime = Math.max(0, windowMaxTs + allowedLateness);
return cleanupTime >= windowMaxTs ? cleanupTime : Long.MAX_VALUE;
} else {
return Math.max(0, window.maxTimestamp());
return Math.max(0, windowMaxTs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
Expand All @@ -52,9 +53,9 @@
import java.util.stream.Stream;

import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType;
import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;

/** Default implementation of {@link SchemaResolver}. */
Expand Down Expand Up @@ -182,7 +183,7 @@ private List<WatermarkSpec> resolveWatermarkSpecs(

// validate time attribute
final String timeColumn = watermarkSpec.getColumnName();
validateTimeColumn(timeColumn, inputColumns);
final Column validatedTimeColumn = validateTimeColumn(timeColumn, inputColumns);

// resolve watermark expression
final ResolvedExpression watermarkExpression;
Expand All @@ -197,11 +198,20 @@ private List<WatermarkSpec> resolveWatermarkSpecs(
}
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());

if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
== validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
throw new ValidationException(
String.format(
"The watermark output type %s is different from input time filed type %s.",
watermarkExpression.getOutputDataType(),
validatedTimeColumn.getDataType()));
}

return Collections.singletonList(
WatermarkSpec.of(watermarkSpec.getColumnName(), watermarkExpression));
}

private void validateTimeColumn(String columnName, List<Column> columns) {
private Column validateTimeColumn(String columnName, List<Column> columns) {
final Optional<Column> timeColumn =
columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
if (!timeColumn.isPresent()) {
Expand All @@ -212,7 +222,7 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
columns.stream().map(Column::getName).collect(Collectors.toList())));
}
final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
if (!supportedWatermarkType(timeFieldType) || getPrecision(timeFieldType) != 3) {
if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) {
throw new ValidationException(
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand All @@ -221,10 +231,11 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
throw new ValidationException(
"A watermark can not be defined for a processing-time attribute.");
}
return timeColumn.get();
}

private void validateWatermarkExpression(LogicalType watermarkType) {
if (!supportedWatermarkType(watermarkType) || getPrecision(watermarkType) != 3) {
if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) {
throw new ValidationException(
"Invalid data type of expression for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand All @@ -245,13 +256,29 @@ private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column
final boolean hasWatermarkSpec =
watermarkSpecs.stream().anyMatch(s -> s.getRowtimeAttribute().equals(name));
if (hasWatermarkSpec && isStreamingMode) {
final TimestampType originalType = (TimestampType) dataType.getLogicalType();
final LogicalType rowtimeType =
new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeType));
switch (dataType.getLogicalType().getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
final TimestampType originalType = (TimestampType) dataType.getLogicalType();
final LogicalType rowtimeType =
new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeType));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final LocalZonedTimestampType timestampLtzType =
(LocalZonedTimestampType) dataType.getLogicalType();
final LogicalType rowtimeLtzType =
new LocalZonedTimestampType(
timestampLtzType.isNullable(),
TimestampKind.ROWTIME,
timestampLtzType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeLtzType));
default:
throw new ValidationException(
"Invalid data type of expression for rowtime definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
}
}
return column;
}
Expand Down
Loading