Skip to content

Commit

Permalink
[FLINK-20387][table] Support TIMESTAMP_LTZ as rowtime attribute
Browse files Browse the repository at this point in the history
This closes #15485
  • Loading branch information
leonardBang authored and wuchong committed Apr 9, 2021
1 parent 6faaa37 commit 276e847
Show file tree
Hide file tree
Showing 118 changed files with 3,172 additions and 1,456 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@

import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
import static org.apache.flink.fnexecution.v1.FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/** The Python Group Window AggregateFunction operator for the blink planner. */
@Internal
Expand Down Expand Up @@ -243,15 +245,17 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
setCurrentKey(((RowDataSerializer) getKeySerializer()).toBinaryRow(key));

if (timerOperandType == REGISTER_EVENT_TIMER) {
internalTimerService.registerEventTimeTimer(window, timestamp);
internalTimerService.registerEventTimeTimer(
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == REGISTER_PROCESSING_TIMER) {
internalTimerService.registerProcessingTimeTimer(
window, TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_EVENT_TIMER) {
internalTimerService.deleteEventTimeTimer(window, timestamp);
internalTimerService.deleteEventTimeTimer(
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else if (timerOperandType == DELETE_PROCESSING_TIMER) {
internalTimerService.deleteProcessingTimeTimer(
window, TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone));
window, toEpochMillsForTimer(timestamp, shiftTimeZone));
} else {
throw new RuntimeException(
String.format("Unsupported timerOperandType %s.", timerOperandType));
Expand Down Expand Up @@ -407,8 +411,7 @@ private void emitTriggerTimerData(InternalTimer<K, W> timer, byte processingTime
reuseTimerData.setField(2, baos.toByteArray());
baos.reset();

reuseTimerRowData.setLong(
2, TimeWindowUtil.toUtcTimestampMills(timer.getTimestamp(), shiftTimeZone));
reuseTimerRowData.setLong(2, toUtcTimestampMills(timer.getTimestamp(), shiftTimeZone));

udfInputTypeSerializer.serialize(reuseTimerRowData, baosWrapper);
pythonFunctionRunner.process(baos.toByteArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -64,6 +63,10 @@
import java.util.LinkedList;
import java.util.List;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/** The Stream Arrow Python {@link AggregateFunction} Operator for Group Window Aggregation. */
@Internal
public class StreamArrowPythonGroupWindowAggregateFunctionOperator<K, W extends Window>
Expand Down Expand Up @@ -201,7 +204,7 @@ public void bufferInput(RowData input) throws Exception {
} else {
timestamp = internalTimerService.currentProcessingTime();
}
timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);
timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);

// Given the timestamp and element, returns the set of windows into which it
// should be placed.
Expand Down Expand Up @@ -308,7 +311,8 @@ private void buildWindow(PlannerNamedWindowProperty[] namedProperties) {
*/
private boolean isWindowLate(W window) {
return windowAssigner.isEventTime()
&& (cleanupTime(window) <= internalTimerService.currentWatermark());
&& (toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)
<= internalTimerService.currentWatermark());
}

/**
Expand Down Expand Up @@ -382,7 +386,7 @@ private boolean hasRetractData(
* @param window the window whose state to discard
*/
private void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
Expand Down Expand Up @@ -421,11 +425,11 @@ private void setWindowProperty(W currentWindow) {
}

private long getShiftEpochMills(long utcTimestampMills) {
return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone);
return toEpochMills(utcTimestampMills, shiftTimeZone);
}

private void cleanWindowIfNeeded(W window, long currentTime) throws Exception {
if (currentTime == cleanupTime(window)) {
if (currentTime == toEpochMillsForTimer(cleanupTime(window), shiftTimeZone)) {
windowAccumulateData.setCurrentNamespace(window);
windowAccumulateData.clear();
windowRetractData.setCurrentNamespace(window);
Expand Down Expand Up @@ -453,8 +457,7 @@ boolean onElement(RowData row, long timestamp) throws Exception {
}

boolean onProcessingTime(long time) throws Exception {
return trigger.onProcessingTime(
TimeWindowUtil.toUtcTimestampMills(time, shiftTimeZone), window);
return trigger.onProcessingTime(time, window);
}

boolean onEventTime(long time) throws Exception {
Expand Down Expand Up @@ -482,8 +485,7 @@ public MetricGroup getMetricGroup() {

@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.registerProcessingTimeTimer(window, time);
}

@Override
Expand All @@ -493,15 +495,19 @@ public void registerEventTimeTimer(long time) {

@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(
window, TimeWindowUtil.toEpochMillsForTimer(time, shiftTimeZone));
internalTimerService.deleteProcessingTimeTimer(window, time);
}

@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(window, time);
}

@Override
public ZoneId getShiftTimeZone() {
return shiftTimeZone;
}

@Override
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
try {
Expand Down Expand Up @@ -536,6 +542,11 @@ public long currentWatermark() {
throw new RuntimeException("The method currentWatermark should not be called.");
}

@Override
public ZoneId getShiftTimeZone() {
return shiftTimeZone;
}

@Override
public RowData getWindowAccumulators(W window) {
throw new RuntimeException("The method getWindowAccumulators should not be called.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;

/** PassThroughPythonStreamGroupWindowAggregateOperator. */
public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
extends PythonStreamGroupWindowAggregateOperator<K, TimeWindow> {
Expand Down Expand Up @@ -296,11 +298,12 @@ private boolean isWindowLate(TimeWindow window) {
}

private long cleanupTime(TimeWindow window) {
long windowMaxTs = toEpochMillsForTimer(window.maxTimestamp(), shiftTimeZone);
if (windowAssigner.isEventTime()) {
long cleanupTime = Math.max(0, window.maxTimestamp() + allowedLateness);
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
long cleanupTime = Math.max(0, windowMaxTs + allowedLateness);
return cleanupTime >= windowMaxTs ? cleanupTime : Long.MAX_VALUE;
} else {
return Math.max(0, window.maxTimestamp());
return Math.max(0, windowMaxTs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
Expand All @@ -52,9 +53,9 @@
import java.util.stream.Stream;

import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.supportedWatermarkType;
import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType;

/** Default implementation of {@link SchemaResolver}. */
Expand Down Expand Up @@ -182,7 +183,7 @@ private List<WatermarkSpec> resolveWatermarkSpecs(

// validate time attribute
final String timeColumn = watermarkSpec.getColumnName();
validateTimeColumn(timeColumn, inputColumns);
final Column validatedTimeColumn = validateTimeColumn(timeColumn, inputColumns);

// resolve watermark expression
final ResolvedExpression watermarkExpression;
Expand All @@ -197,11 +198,20 @@ private List<WatermarkSpec> resolveWatermarkSpecs(
}
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());

if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
== validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
throw new ValidationException(
String.format(
"The watermark output type %s is different from input time filed type %s.",
watermarkExpression.getOutputDataType(),
validatedTimeColumn.getDataType()));
}

return Collections.singletonList(
WatermarkSpec.of(watermarkSpec.getColumnName(), watermarkExpression));
}

private void validateTimeColumn(String columnName, List<Column> columns) {
private Column validateTimeColumn(String columnName, List<Column> columns) {
final Optional<Column> timeColumn =
columns.stream().filter(c -> c.getName().equals(columnName)).findFirst();
if (!timeColumn.isPresent()) {
Expand All @@ -212,7 +222,7 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
columns.stream().map(Column::getName).collect(Collectors.toList())));
}
final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
if (!supportedWatermarkType(timeFieldType) || getPrecision(timeFieldType) != 3) {
if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) {
throw new ValidationException(
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand All @@ -221,10 +231,11 @@ private void validateTimeColumn(String columnName, List<Column> columns) {
throw new ValidationException(
"A watermark can not be defined for a processing-time attribute.");
}
return timeColumn.get();
}

private void validateWatermarkExpression(LogicalType watermarkType) {
if (!supportedWatermarkType(watermarkType) || getPrecision(watermarkType) != 3) {
if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) {
throw new ValidationException(
"Invalid data type of expression for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
Expand All @@ -245,13 +256,29 @@ private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column
final boolean hasWatermarkSpec =
watermarkSpecs.stream().anyMatch(s -> s.getRowtimeAttribute().equals(name));
if (hasWatermarkSpec && isStreamingMode) {
final TimestampType originalType = (TimestampType) dataType.getLogicalType();
final LogicalType rowtimeType =
new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeType));
switch (dataType.getLogicalType().getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
final TimestampType originalType = (TimestampType) dataType.getLogicalType();
final LogicalType rowtimeType =
new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeType));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final LocalZonedTimestampType timestampLtzType =
(LocalZonedTimestampType) dataType.getLogicalType();
final LogicalType rowtimeLtzType =
new LocalZonedTimestampType(
timestampLtzType.isNullable(),
TimestampKind.ROWTIME,
timestampLtzType.getPrecision());
return column.copy(replaceLogicalType(dataType, rowtimeLtzType));
default:
throw new ValidationException(
"Invalid data type of expression for rowtime definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
}
}
return column;
}
Expand Down
Loading

0 comments on commit 276e847

Please sign in to comment.