Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Apr 7, 2021
1 parent a70a0d8 commit ee69df7
Show file tree
Hide file tree
Showing 24 changed files with 214 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,11 @@ private List<WatermarkSpec> resolveWatermarkSpecs(
}
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());

if (!watermarkExpression
.getOutputDataType()
.getLogicalType()
.getTypeRoot()
.equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
== validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
throw new ValidationException(
String.format(
"The watermark output type %s is different with input time filed type %s.",
"The watermark output type %s is different from input time filed type %s.",
watermarkExpression.getOutputDataType(),
validatedTimeColumn.getDataType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testSchemaResolutionErrors() {
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", callSql(INVALID_WATERMARK_SQL))
.build(),
"The watermark output type TIMESTAMP_LTZ(3) is different with input time filed type TIMESTAMP(3).");
"The watermark output type TIMESTAMP_LTZ(3) is different from input time filed type TIMESTAMP(3).");

testError(
Schema.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil
}

public static boolean isTimeAttribute(LogicalType logicalType) {
return (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE))
&& logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR;
return isRowtimeAttribute(logicalType) || isProctimeAttribute(logicalType);
}

public static boolean isRowtimeAttribute(LogicalType logicalType) {
Expand All @@ -126,8 +124,9 @@ public static boolean isProctimeAttribute(LogicalType logicalType) {
}

public static boolean canBeTimeAttributeType(LogicalType logicalType) {
if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
|| logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
if (typeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
|| typeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,29 +338,29 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {

val head = inputTypes.head.getFieldList.map(_.getType)

inputTypes.forall { t =>
inputTypes.foreach { t =>
val fieldTypes = t.getFieldList.map(_.getType)

fieldTypes.zip(head).forall { case (l, r) =>
fieldTypes.zip(head).foreach{ case (l, r) =>
validateUnionPair(l, r)
}
}

setOp.copy(setOp.getTraitSet, inputs, setOp.all)
}

private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = {
private def validateUnionPair(l: RelDataType, r: RelDataType): Unit = {
val exceptionMsg =
s"Union fields with time attributes requires same types, but the types are %s and %s."
// check if time indicators match
val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
if (leftTime && leftTime) {
val leftIsEventTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
val rightIsEventTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
if (leftIsEventTime && rightIsEventTime) {
//rowtime must have same type
isTimestampLtzIndicatorType(l) == isTimestampLtzIndicatorType(r)
} else {
leftTime == rightTime
leftIsEventTime == rightIsEventTime
}
}
// one side is not an indicator
Expand All @@ -376,7 +376,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
throw new ValidationException(
String.format(exceptionMsg, l.toString, r.toString))
}
isValid
}

private def gatherIndicesToMaterialize(aggregate: Aggregate, input: RelNode): Set[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.time.ZoneId;

import static org.apache.flink.table.runtime.operators.window.TimeWindow.getWindowStartWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;

/**
* The operator used for local window aggregation.
Expand Down Expand Up @@ -73,10 +72,10 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
/** Flag to prevent duplicate function.close() calls in close() and dispose(). */
private transient boolean functionsClosed = false;

/** current watermark of this operator, the value has considered shifted timezone. */
private transient long currentShiftWatermark;
/** current watermark of this operator. */
private transient long currentWatermark;

/** The next watermark to trigger windows, the value has considered shifted timezone. */
/** The next watermark to trigger windows. */
private transient long nextTriggerWatermark;

/** A buffer to buffers window data in memory and may flush to output. */
Expand Down Expand Up @@ -126,7 +125,8 @@ public void open() throws Exception {
getContainingTask(),
getContainingTask().getEnvironment().getMemoryManager(),
computeMemorySize(),
localCombiner);
localCombiner,
shiftTimezone);
}

@Override
Expand All @@ -140,14 +140,12 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

@Override
public void processWatermark(Watermark mark) throws Exception {
long timestamp = toUtcTimestampMills(mark.getTimestamp(), shiftTimezone);
if (timestamp > currentShiftWatermark) {
currentShiftWatermark = timestamp;
if (currentShiftWatermark >= nextTriggerWatermark) {
if (mark.getTimestamp() > currentWatermark) {
currentWatermark = mark.getTimestamp();
if (currentWatermark >= nextTriggerWatermark) {
// we only need to call advanceProgress() when current watermark may trigger window
windowBuffer.advanceProgress(currentShiftWatermark);
nextTriggerWatermark =
getNextTriggerWatermark(currentShiftWatermark, windowInterval);
windowBuffer.advanceProgress(currentWatermark);
nextTriggerWatermark = getNextTriggerWatermark(currentWatermark, windowInterval);
}
}
super.processWatermark(mark);
Expand Down Expand Up @@ -192,11 +190,11 @@ private long computeMemorySize() {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/** Method to get the next shifted watermark to trigger window. */
private static long getNextTriggerWatermark(long currentShiftWatermark, long interval) {
long start = getWindowStartWithOffset(currentShiftWatermark, 0L, interval);
/** Method to get the next watermark to trigger window. */
private static long getNextTriggerWatermark(long currentWatermark, long interval) {
long start = getWindowStartWithOffset(currentWatermark, 0L, interval);
long triggerWatermark = start + interval - 1;
if (triggerWatermark > currentShiftWatermark) {
if (triggerWatermark > currentWatermark) {
return triggerWatermark;
} else {
return triggerWatermark + interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,21 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
combinerFactory,
(SliceSharedAssigner) assigner,
accSerializer,
indexOfCountStart);
indexOfCountStart,
shiftTimeZone);
} else if (assigner instanceof SliceUnsharedAssigner) {
windowProcessor =
new SliceUnsharedWindowAggProcessor(
generatedAggregateFunction,
bufferFactory,
combinerFactory,
(SliceUnsharedAssigner) assigner,
accSerializer);
accSerializer,
shiftTimeZone);
} else {
throw new IllegalArgumentException(
"assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner.");
}
return new SlicingWindowOperator<>(windowProcessor, shiftTimeZone);
return new SlicingWindowOperator<>(windowProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;

import java.io.EOFException;
import java.time.ZoneId;
import java.util.Iterator;

/**
Expand All @@ -42,30 +44,33 @@ public final class RecordsWindowBuffer implements WindowBuffer {
private final WindowBytesMultiMap recordsBuffer;
private final WindowKey reuseWindowKey;
private final AbstractRowDataSerializer<RowData> recordSerializer;
private final ZoneId shiftTimeZone;

private long minTriggerTime = Long.MAX_VALUE;
private long minSliceEnd = Long.MAX_VALUE;

public RecordsWindowBuffer(
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
WindowCombineFunction combineFunction,
PagedTypeSerializer<RowData> keySer,
AbstractRowDataSerializer<RowData> inputSer) {
AbstractRowDataSerializer<RowData> inputSer,
ZoneId shiftTimeZone) {
this.combineFunction = combineFunction;
this.recordsBuffer =
new WindowBytesMultiMap(
operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity());
this.recordSerializer = inputSer;
this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
this.shiftTimeZone = shiftTimeZone;
}

@Override
public void addElement(RowData key, long sliceEnd, RowData element) throws Exception {
// track the lowest trigger time, if watermark exceeds the trigger time,
// it means there are some elements in the buffer belong to a window going to be fired,
// and we need to flush the buffer into state for firing.
minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);
minSliceEnd = Math.min(sliceEnd - 1, minSliceEnd);

reuseWindowKey.replace(sliceEnd, key);
LookupInfo<WindowKey, Iterator<RowData>> lookup = recordsBuffer.lookup(reuseWindowKey);
Expand All @@ -81,7 +86,7 @@ public void addElement(RowData key, long sliceEnd, RowData element) throws Excep

@Override
public void advanceProgress(long progress) throws Exception {
if (progress >= minTriggerTime) {
if (progress >= TimeWindowUtil.toEpochMillsForTimer(minSliceEnd, shiftTimeZone)) {
// there should be some window to be fired, flush buffer to state first
flush();
}
Expand All @@ -97,7 +102,7 @@ public void flush() throws Exception {
}
recordsBuffer.reset();
// reset trigger time
minTriggerTime = Long.MAX_VALUE;
minSliceEnd = Long.MAX_VALUE;
}
}

Expand Down Expand Up @@ -129,9 +134,16 @@ public WindowBuffer create(
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
WindowCombineFunction combineFunction) {
WindowCombineFunction combineFunction,
ZoneId shiftTimeZone) {
return new RecordsWindowBuffer(
operatorOwner, memoryManager, memorySize, combineFunction, keySer, inputSer);
operatorOwner,
memoryManager,
memorySize,
combineFunction,
keySer,
inputSer,
shiftTimeZone);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.time.ZoneId;

/**
* A buffer that buffers data in memory and flushes many values to state together at a time to avoid
Expand Down Expand Up @@ -83,13 +84,15 @@ interface Factory extends Serializable {
* @param memoryManager the manager that governs memory by Flink framework
* @param memorySize the managed memory size can be used by this operator
* @param combineFunction the combine function used to combine buffered data into state
* @param shiftTimeZone the shit timezone of the window
* @throws IOException thrown if the buffer can't be opened
*/
WindowBuffer create(
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
WindowCombineFunction combineFunction)
WindowCombineFunction combineFunction,
ZoneId shiftTimeZone)
throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ record = recordSerializer.copy(record);

// step 5: register timer for current window
if (isEventTime) {
timerService.registerEventTimeWindowTimer(window, window - 1);
timerService.registerEventTimeWindowTimer(window);
}
// we don't need register processing-time timer, because we already register them
// per-record in AbstractWindowAggProcessor.processElement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void combine(WindowKey windowKey, Iterator<RowData> localAccs) throws Exc
accState.update(window, stateAcc);

// step 3: register timer for current window
timerService.registerEventTimeWindowTimer(window, window - 1);
timerService.registerEventTimeWindowTimer(window);
}

@Override
Expand Down
Loading

0 comments on commit ee69df7

Please sign in to comment.