From 1d43ce7ad5aa1b1a53be0e304b1d5b9ed477c3b7 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Mon, 26 Apr 2021 12:16:40 +0800 Subject: [PATCH] [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder. --- .../join/window/WindowJoinOperator.java | 558 ++++++++++++++++++ .../window/WindowJoinOperatorBuilder.java | 168 ++++++ .../window/state/WindowListState.java | 83 +++ .../join/window/WindowJoinOperatorTest.java | 433 ++++++++++++++ 4 files changed, 1242 insertions(+) create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java new file mode 100644 index 00000000000000..6f9c8c4b096ce9 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.util.IdentityHashMap; +import java.util.List; + +/** + * Streaming window join operator. + * + *

Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + */ +public abstract class WindowJoinOperator extends TableStreamOperator + implements TwoInputStreamOperator, + Triggerable, + KeyContext { + + private static final long serialVersionUID = 1L; + + private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "leftNumLateRecordsDropped"; + private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "leftLateRecordsDroppedRate"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "rightNumLateRecordsDropped"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "rightLateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + private static final String LEFT_RECORDS_STATE_NAME = "left-records"; + private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + + protected final InternalTypeInfo leftType; + protected final InternalTypeInfo rightType; + private final GeneratedJoinCondition generatedJoinCondition; + + private final int leftWindowEndIndex; + private final int rightWindowEndIndex; + + private final boolean[] filterNullKeys; + + /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ + private transient boolean functionsClosed = false; + + private transient InternalTimerService internalTimerService; + + // ------------------------------------------------------------------------ + protected transient JoinConditionWithNullFilters joinCondition; + + /** This is used for emitting elements with a given timestamp. */ + protected transient TimestampedCollector collector; + + private transient WindowListState leftWindowState; + private transient WindowListState rightWindowState; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private transient Counter leftNumLateRecordsDropped; + private transient Meter leftLateRecordsDroppedRate; + private transient Counter rightNumLateRecordsDropped; + private transient Meter rightLateRecordsDroppedRate; + private transient Gauge watermarkLatency; + + WindowJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + this.leftType = leftType; + this.rightType = rightType; + this.generatedJoinCondition = generatedJoinCondition; + this.leftWindowEndIndex = leftWindowEndIndex; + this.rightWindowEndIndex = rightWindowEndIndex; + this.filterNullKeys = filterNullKeys; + } + + @Override + public void open() throws Exception { + super.open(); + functionsClosed = false; + + this.collector = new TimestampedCollector<>(output); + collector.eraseTimestamp(); + + final LongSerializer windowSerializer = LongSerializer.INSTANCE; + + internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); + + // init join condition + JoinCondition condition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this); + this.joinCondition.setRuntimeContext(getRuntimeContext()); + this.joinCondition.open(new Configuration()); + + // init state + ListStateDescriptor leftRecordStateDesc = + new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftType); + ListState leftListState = + getOrCreateKeyedState(windowSerializer, leftRecordStateDesc); + this.leftWindowState = + new WindowListState<>((InternalListState) leftListState); + + ListStateDescriptor rightRecordStateDesc = + new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightType); + ListState rightListState = + getOrCreateKeyedState(windowSerializer, rightRecordStateDesc); + this.rightWindowState = + new WindowListState<>((InternalListState) rightListState); + + // metrics + this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.leftLateRecordsDroppedRate = + metrics.meter( + LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(leftNumLateRecordsDropped)); + this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.rightLateRecordsDroppedRate = + metrics.meter( + RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(rightNumLateRecordsDropped)); + this.watermarkLatency = + metrics.gauge( + WATERMARK_LATENCY_METRIC_NAME, + () -> { + long watermark = internalTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return internalTimerService.currentProcessingTime() - watermark; + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + collector = null; + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + + @Override + public void dispose() throws Exception { + super.dispose(); + collector = null; + if (!functionsClosed) { + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState); + } + + private void processElement( + StreamRecord element, + int windowEndIndex, + Meter lateRecordsDroppedRate, + WindowListState recordState) + throws Exception { + RowData inputRow = element.getValue(); + long windowEnd = inputRow.getLong(windowEndIndex); + long windowMaxTimestamp = windowEnd - 1; + if (windowMaxTimestamp <= internalTimerService.currentWatermark()) { + // element is late and should be dropped + lateRecordsDroppedRate.markEvent(); + return; + } + if (RowDataUtil.isAccumulateMsg(inputRow)) { + recordState.add(windowEnd, inputRow); + } else { + recordState.delete(windowEnd, inputRow); + } + // always register time for every element + internalTimerService.registerEventTimeTimer(windowEnd, windowMaxTimestamp); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // Window join only support event-time now + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + setCurrentKey(timer.getKey()); + Long window = timer.getNamespace(); + // join left records and right records + List leftData = leftWindowState.get(window); + List rightData = rightWindowState.get(window); + join(leftData, rightData); + // clear state + leftWindowState.clear(window); + rightWindowState.clear(window); + // remove timer registered on current window and timestamp + internalTimerService.deleteEventTimeTimer(window, timer.getTimestamp()); + } + + public abstract void join(Iterable leftRecords, Iterable rightRecords); + + static class SemiAntiJoinOperator extends WindowJoinOperator { + + private final boolean isAntiJoin; + + SemiAntiJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + boolean isAntiJoin) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + this.isAntiJoin = isAntiJoin; + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + if (isAntiJoin) { + for (RowData leftRecord : leftRecords) { + collector.collect(leftRecord); + } + } + return; + } + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + if (joinCondition.apply(leftRecord, rightRecord)) { + matches = true; + break; + } + } + } + if (matches) { + if (!isAntiJoin) { + // emit left record if there are matched rows on the other side + collector.collect(leftRecord); + } + } else { + if (isAntiJoin) { + // emit left record if there is no matched row on the other side + collector.collect(leftRecord); + } + } + } + } + } + + static class InnerJoinOperator extends WindowJoinOperator { + private transient JoinedRowData outRow; + + InnerJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + } + + @Override + public void open() throws Exception { + super.open(); + outRow = new JoinedRowData(); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null || rightRecords == null) { + return; + } + for (RowData leftRecord : leftRecords) { + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + outRow.setRowKind(RowKind.INSERT); + outRow.replace(leftRecord, rightRecord); + collector.collect(outRow); + } + } + } + } + } + + private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator { + + private transient RowData leftNullRow; + private transient RowData rightNullRow; + private transient JoinedRowData outRow; + + AbstractOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + } + + @Override + public void open() throws Exception { + super.open(); + leftNullRow = new GenericRowData(leftType.toRowSize()); + rightNullRow = new GenericRowData(rightType.toRowSize()); + outRow = new JoinedRowData(); + } + + protected void outputNullPadding(RowData row, boolean isLeft) { + if (isLeft) { + outRow.replace(row, rightNullRow); + } else { + outRow.replace(leftNullRow, row); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + + protected void outputNullPadding(Iterable rows, boolean isLeft) { + for (RowData row : rows) { + outputNullPadding(row, isLeft); + } + } + + protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { + if (inputIsLeft) { + outRow.replace(inputRow, otherRow); + } else { + outRow.replace(otherRow, inputRow); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + } + + static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { + + LeftOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else { + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + // padding null for left side + outputNullPadding(leftRecord, true); + } + } + } + } + } + + static class RightOuterJoinOperator extends AbstractOuterJoinOperator { + + RightOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (rightRecords == null) { + return; + } + if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + for (RowData rightRecord : rightRecords) { + boolean matches = false; + for (RowData leftRecord : leftRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + outputNullPadding(rightRecord, false); + } + } + } + } + } + + static class FullOuterJoinOperator extends AbstractOuterJoinOperator { + + FullOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null && rightRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + IdentityHashMap emittedRightRecords = new IdentityHashMap<>(); + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + emittedRightRecords.put(rightRecord, Boolean.TRUE); + } + } + // padding null for left side + if (!matches) { + outputNullPadding(leftRecord, true); + } + } + // padding null for never emitted right side + for (RowData rightRecord : rightRecords) { + if (!emittedRightRecords.containsKey(rightRecord)) { + outputNullPadding(rightRecord, false); + } + } + } + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java new file mode 100644 index 00000000000000..8b484ac52acd38 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window + * join. + * + *

+ * WindowJoinOperatorBuilder.builder()
+ *   .leftType(leftType)
+ *   .rightType(rightType)
+ *   .generatedJoinCondition(generatedJoinCondition)
+ *   .leftWindowEndIndex(leftWindowEndIndex)
+ *   .rightWindowEndIndex(rightWindowEndIndex)
+ *   .filterNullKeys(filterNullKeys)
+ *   .joinType(joinType)
+ *   .build();
+ * 
+ */ +public class WindowJoinOperatorBuilder { + + public static WindowJoinOperatorBuilder builder() { + return new WindowJoinOperatorBuilder(); + } + + private InternalTypeInfo leftType; + private InternalTypeInfo rightType; + private GeneratedJoinCondition generatedJoinCondition; + private int leftWindowEndIndex = -1; + private int rightWindowEndIndex = -1; + private boolean[] filterNullKeys; + private FlinkJoinType joinType; + + public WindowJoinOperatorBuilder leftType(InternalTypeInfo leftType) { + this.leftType = leftType; + return this; + } + + public WindowJoinOperatorBuilder rightType(InternalTypeInfo rightType) { + this.rightType = rightType; + return this; + } + + public WindowJoinOperatorBuilder generatedJoinCondition( + GeneratedJoinCondition generatedJoinCondition) { + this.generatedJoinCondition = generatedJoinCondition; + return this; + } + + public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) { + this.filterNullKeys = filterNullKeys; + return this; + } + + public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) { + this.joinType = joinType; + return this; + } + + public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) { + this.leftWindowEndIndex = leftWindowEndIndex; + return this; + } + + public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) { + this.rightWindowEndIndex = rightWindowEndIndex; + return this; + } + + public WindowJoinOperator build() { + checkNotNull(leftType); + checkNotNull(rightType); + checkNotNull(generatedJoinCondition); + checkNotNull(filterNullKeys); + checkNotNull(joinType); + + checkArgument( + leftWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + leftWindowEndIndex)); + checkArgument( + rightWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + rightWindowEndIndex)); + + switch (joinType) { + case INNER: + return new WindowJoinOperator.InnerJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + case SEMI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + false); + case ANTI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + true); + case LEFT: + return new WindowJoinOperator.LeftOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + case RIGHT: + return new WindowJoinOperator.RightOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + case FULL: + return new WindowJoinOperator.FullOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys); + default: + throw new IllegalArgumentException("Invalid join type: " + joinType); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java new file mode 100644 index 00000000000000..afeb2911ab5c3e --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.table.data.RowData; + +import java.util.List; + +/** A wrapper of {@link ListState} which is easier to update based on window namespace. */ +public final class WindowListState implements WindowState { + + private final InternalListState windowState; + + public WindowListState(InternalListState windowState) { + this.windowState = windowState; + } + + public void clear(W window) { + windowState.setCurrentNamespace(window); + windowState.clear(); + } + + public List get(W window) throws Exception { + windowState.setCurrentNamespace(window); + return windowState.getInternal(); + } + + /** + * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the + * list of values. The next time {@link #get(W)} is called (for the same state partition) the + * returned state will represent the updated list. + * + *

If null is passed in, the state value will remain unchanged. + * + * @param window The namespace for the state. + * @param value The new value for the state. + * @throws Exception Thrown if the system cannot access the state. + */ + public void add(W window, RowData value) throws Exception { + windowState.setCurrentNamespace(window); + windowState.add(value); + } + + /** + * Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the + * list of values. The next time {@link #get(W)} is called (for the same state partition) the + * returned state will represent the updated list. + * + *

If null is passed in, the state value will remain unchanged. + * + *

The performance is not well, first get complete list by calling {@link + * InternalListState#getInternal()})}, then remove the value from list, finally update state by + * calling {@link InternalListState#update(List)}. + * + * @param window The namespace for the state. + * @param value The new value for the state. + * @throws Exception Thrown if the system cannot access the state. + */ + public boolean delete(W window, RowData value) throws Exception { + List completeData = get(window); + boolean flag = completeData.remove(value); + windowState.updateInternal(completeData); + return flag; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java new file mode 100644 index 00000000000000..c1b4f0d81474b9 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.junit.Assert.assertEquals; + +/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */ +public class WindowJoinOperatorTest { + + private InternalTypeInfo rowType = + InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + + private InternalTypeInfo outputRowType = + InternalTypeInfo.ofFields( + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH), + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH)); + private RowDataHarnessAssertor assertor = + new RowDataHarnessAssertor(outputRowType.toRowFieldTypes()); + + private RowDataHarnessAssertor semiAntiJoinAssertor = + new RowDataHarnessAssertor(rowType.toRowFieldTypes()); + + private String funcCode = + "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + "\n" + + " public TestWindowJoinCondition(Object[] reference) {\n" + + " }\n" + + "\n" + + " @Override\n" + + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n" + + " return true;\n" + + " }\n" + + "}\n"; + + private GeneratedJoinCondition joinFunction = + new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]); + + private int keyIdx = 1; + private RowDataKeySelector keySelector = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {keyIdx}, rowType.toRowFieldTypes()); + private TypeInformation keyType = InternalTypeInfo.ofFields(); + + @Test + public void testSemiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.SEMI); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1")); + expectedOutput.add(insertRecord(3L, "k1")); + expectedOutput.add(new Watermark(10)); + semiAntiJoinAssertor.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(15L, "k1")); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testAntiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.ANTI); + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(6L, "k1")); + expectedOutput.add(new Watermark(10)); + semiAntiJoinAssertor.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(12L, "k1")); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testInnerJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.INNER); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(new Watermark(10)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.LEFT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(6L, "k1", null, null)); + expectedOutput.add(new Watermark(10)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(12L, "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testRightOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.RIGHT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(null, null, 9L, "k1")); + expectedOutput.add(new Watermark(10)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.FULL); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(1L, "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(3L, "k1")); + testHarness.processElement2(insertRecord(3L, "k1")); + testHarness.processElement1(insertRecord(6L, "k1")); + testHarness.processElement2(insertRecord(9L, "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(3L, "k1", 3L, "k1")); + expectedOutput.add(insertRecord(6L, "k1", null, null)); + expectedOutput.add(insertRecord(null, null, 9L, "k1")); + expectedOutput.add(new Watermark(10)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(12L, "k1")); + testHarness.processElement1(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + testHarness.processElement2(insertRecord(15L, "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(12L, "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(insertRecord(15L, "k1", 15L, "k1")); + expectedOutput.add(new Watermark(18)); + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + private KeyedTwoInputStreamOperatorTestHarness + createTestHarness(FlinkJoinType joinType) throws Exception { + WindowJoinOperator operator = + WindowJoinOperatorBuilder.builder() + .leftType(rowType) + .rightType(rowType) + .generatedJoinCondition(joinFunction) + .leftWindowEndIndex(0) + .rightWindowEndIndex(0) + .filterNullKeys(new boolean[] {true}) + .joinType(joinType) + .build(); + KeyedTwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector, keyType); + return testHarness; + } +}