diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
new file mode 100644
index 00000000000000..6f9c8c4b096ce9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Streaming window join operator.
+ *
+ *
Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator
+ implements TwoInputStreamOperator,
+ Triggerable,
+ KeyContext {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+ "leftNumLateRecordsDropped";
+ private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+ "leftLateRecordsDroppedRate";
+ private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+ "rightNumLateRecordsDropped";
+ private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+ "rightLateRecordsDroppedRate";
+ private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+ private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+ private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+ protected final InternalTypeInfo leftType;
+ protected final InternalTypeInfo rightType;
+ private final GeneratedJoinCondition generatedJoinCondition;
+
+ private final int leftWindowEndIndex;
+ private final int rightWindowEndIndex;
+
+ private final boolean[] filterNullKeys;
+
+ /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+ private transient boolean functionsClosed = false;
+
+ private transient InternalTimerService internalTimerService;
+
+ // ------------------------------------------------------------------------
+ protected transient JoinConditionWithNullFilters joinCondition;
+
+ /** This is used for emitting elements with a given timestamp. */
+ protected transient TimestampedCollector collector;
+
+ private transient WindowListState leftWindowState;
+ private transient WindowListState rightWindowState;
+
+ // ------------------------------------------------------------------------
+ // Metrics
+ // ------------------------------------------------------------------------
+
+ private transient Counter leftNumLateRecordsDropped;
+ private transient Meter leftLateRecordsDroppedRate;
+ private transient Counter rightNumLateRecordsDropped;
+ private transient Meter rightLateRecordsDroppedRate;
+ private transient Gauge watermarkLatency;
+
+ WindowJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ this.leftType = leftType;
+ this.rightType = rightType;
+ this.generatedJoinCondition = generatedJoinCondition;
+ this.leftWindowEndIndex = leftWindowEndIndex;
+ this.rightWindowEndIndex = rightWindowEndIndex;
+ this.filterNullKeys = filterNullKeys;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ functionsClosed = false;
+
+ this.collector = new TimestampedCollector<>(output);
+ collector.eraseTimestamp();
+
+ final LongSerializer windowSerializer = LongSerializer.INSTANCE;
+
+ internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
+
+ // init join condition
+ JoinCondition condition =
+ generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this);
+ this.joinCondition.setRuntimeContext(getRuntimeContext());
+ this.joinCondition.open(new Configuration());
+
+ // init state
+ ListStateDescriptor leftRecordStateDesc =
+ new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftType);
+ ListState leftListState =
+ getOrCreateKeyedState(windowSerializer, leftRecordStateDesc);
+ this.leftWindowState =
+ new WindowListState<>((InternalListState) leftListState);
+
+ ListStateDescriptor rightRecordStateDesc =
+ new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightType);
+ ListState rightListState =
+ getOrCreateKeyedState(windowSerializer, rightRecordStateDesc);
+ this.rightWindowState =
+ new WindowListState<>((InternalListState) rightListState);
+
+ // metrics
+ this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+ this.leftLateRecordsDroppedRate =
+ metrics.meter(
+ LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+ new MeterView(leftNumLateRecordsDropped));
+ this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+ this.rightLateRecordsDroppedRate =
+ metrics.meter(
+ RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+ new MeterView(rightNumLateRecordsDropped));
+ this.watermarkLatency =
+ metrics.gauge(
+ WATERMARK_LATENCY_METRIC_NAME,
+ () -> {
+ long watermark = internalTimerService.currentWatermark();
+ if (watermark < 0) {
+ return 0L;
+ } else {
+ return internalTimerService.currentProcessingTime() - watermark;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ collector = null;
+ functionsClosed = true;
+ if (joinCondition != null) {
+ joinCondition.close();
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+ collector = null;
+ if (!functionsClosed) {
+ functionsClosed = true;
+ if (joinCondition != null) {
+ joinCondition.close();
+ }
+ }
+ }
+
+ @Override
+ public void processElement1(StreamRecord element) throws Exception {
+ processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState);
+ }
+
+ @Override
+ public void processElement2(StreamRecord element) throws Exception {
+ processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState);
+ }
+
+ private void processElement(
+ StreamRecord element,
+ int windowEndIndex,
+ Meter lateRecordsDroppedRate,
+ WindowListState recordState)
+ throws Exception {
+ RowData inputRow = element.getValue();
+ long windowEnd = inputRow.getLong(windowEndIndex);
+ long windowMaxTimestamp = windowEnd - 1;
+ if (windowMaxTimestamp <= internalTimerService.currentWatermark()) {
+ // element is late and should be dropped
+ lateRecordsDroppedRate.markEvent();
+ return;
+ }
+ if (RowDataUtil.isAccumulateMsg(inputRow)) {
+ recordState.add(windowEnd, inputRow);
+ } else {
+ recordState.delete(windowEnd, inputRow);
+ }
+ // always register time for every element
+ internalTimerService.registerEventTimeTimer(windowEnd, windowMaxTimestamp);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer timer) throws Exception {
+ // Window join only support event-time now
+ throw new UnsupportedOperationException(
+ "This is a bug and should not happen. Please file an issue.");
+ }
+
+ @Override
+ public void onEventTime(InternalTimer timer) throws Exception {
+ setCurrentKey(timer.getKey());
+ Long window = timer.getNamespace();
+ // join left records and right records
+ List leftData = leftWindowState.get(window);
+ List rightData = rightWindowState.get(window);
+ join(leftData, rightData);
+ // clear state
+ leftWindowState.clear(window);
+ rightWindowState.clear(window);
+ // remove timer registered on current window and timestamp
+ internalTimerService.deleteEventTimeTimer(window, timer.getTimestamp());
+ }
+
+ public abstract void join(Iterable leftRecords, Iterable rightRecords);
+
+ static class SemiAntiJoinOperator extends WindowJoinOperator {
+
+ private final boolean isAntiJoin;
+
+ SemiAntiJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ boolean isAntiJoin) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ this.isAntiJoin = isAntiJoin;
+ }
+
+ @Override
+ public void join(Iterable leftRecords, Iterable rightRecords) {
+ if (leftRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ if (isAntiJoin) {
+ for (RowData leftRecord : leftRecords) {
+ collector.collect(leftRecord);
+ }
+ }
+ return;
+ }
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ matches = true;
+ break;
+ }
+ }
+ }
+ if (matches) {
+ if (!isAntiJoin) {
+ // emit left record if there are matched rows on the other side
+ collector.collect(leftRecord);
+ }
+ } else {
+ if (isAntiJoin) {
+ // emit left record if there is no matched row on the other side
+ collector.collect(leftRecord);
+ }
+ }
+ }
+ }
+ }
+
+ static class InnerJoinOperator extends WindowJoinOperator {
+ private transient JoinedRowData outRow;
+
+ InnerJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ outRow = new JoinedRowData();
+ }
+
+ @Override
+ public void join(Iterable leftRecords, Iterable rightRecords) {
+ if (leftRecords == null || rightRecords == null) {
+ return;
+ }
+ for (RowData leftRecord : leftRecords) {
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ outRow.setRowKind(RowKind.INSERT);
+ outRow.replace(leftRecord, rightRecord);
+ collector.collect(outRow);
+ }
+ }
+ }
+ }
+ }
+
+ private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator {
+
+ private transient RowData leftNullRow;
+ private transient RowData rightNullRow;
+ private transient JoinedRowData outRow;
+
+ AbstractOuterJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ leftNullRow = new GenericRowData(leftType.toRowSize());
+ rightNullRow = new GenericRowData(rightType.toRowSize());
+ outRow = new JoinedRowData();
+ }
+
+ protected void outputNullPadding(RowData row, boolean isLeft) {
+ if (isLeft) {
+ outRow.replace(row, rightNullRow);
+ } else {
+ outRow.replace(leftNullRow, row);
+ }
+ outRow.setRowKind(RowKind.INSERT);
+ collector.collect(outRow);
+ }
+
+ protected void outputNullPadding(Iterable rows, boolean isLeft) {
+ for (RowData row : rows) {
+ outputNullPadding(row, isLeft);
+ }
+ }
+
+ protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) {
+ if (inputIsLeft) {
+ outRow.replace(inputRow, otherRow);
+ } else {
+ outRow.replace(otherRow, inputRow);
+ }
+ outRow.setRowKind(RowKind.INSERT);
+ collector.collect(outRow);
+ }
+ }
+
+ static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ LeftOuterJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ }
+
+ @Override
+ public void join(Iterable leftRecords, Iterable rightRecords) {
+ if (leftRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ outputNullPadding(leftRecords, true);
+ } else {
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ }
+ }
+ if (!matches) {
+ // padding null for left side
+ outputNullPadding(leftRecord, true);
+ }
+ }
+ }
+ }
+ }
+
+ static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ RightOuterJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ }
+
+ @Override
+ public void join(Iterable leftRecords, Iterable rightRecords) {
+ if (rightRecords == null) {
+ return;
+ }
+ if (leftRecords == null) {
+ outputNullPadding(rightRecords, false);
+ } else {
+ for (RowData rightRecord : rightRecords) {
+ boolean matches = false;
+ for (RowData leftRecord : leftRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ }
+ }
+ if (!matches) {
+ outputNullPadding(rightRecord, false);
+ }
+ }
+ }
+ }
+ }
+
+ static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ FullOuterJoinOperator(
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys) {
+ super(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ }
+
+ @Override
+ public void join(Iterable leftRecords, Iterable rightRecords) {
+ if (leftRecords == null && rightRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ outputNullPadding(leftRecords, true);
+ } else if (leftRecords == null) {
+ outputNullPadding(rightRecords, false);
+ } else {
+ IdentityHashMap emittedRightRecords = new IdentityHashMap<>();
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ emittedRightRecords.put(rightRecord, Boolean.TRUE);
+ }
+ }
+ // padding null for left side
+ if (!matches) {
+ outputNullPadding(leftRecord, true);
+ }
+ }
+ // padding null for never emitted right side
+ for (RowData rightRecord : rightRecords) {
+ if (!emittedRightRecords.containsKey(rightRecord)) {
+ outputNullPadding(rightRecord, false);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
new file mode 100644
index 00000000000000..8b484ac52acd38
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window
+ * join.
+ *
+ *
+ */
+public class WindowJoinOperatorBuilder {
+
+ public static WindowJoinOperatorBuilder builder() {
+ return new WindowJoinOperatorBuilder();
+ }
+
+ private InternalTypeInfo leftType;
+ private InternalTypeInfo rightType;
+ private GeneratedJoinCondition generatedJoinCondition;
+ private int leftWindowEndIndex = -1;
+ private int rightWindowEndIndex = -1;
+ private boolean[] filterNullKeys;
+ private FlinkJoinType joinType;
+
+ public WindowJoinOperatorBuilder leftType(InternalTypeInfo leftType) {
+ this.leftType = leftType;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder rightType(InternalTypeInfo rightType) {
+ this.rightType = rightType;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder generatedJoinCondition(
+ GeneratedJoinCondition generatedJoinCondition) {
+ this.generatedJoinCondition = generatedJoinCondition;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) {
+ this.filterNullKeys = filterNullKeys;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) {
+ this.joinType = joinType;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) {
+ this.leftWindowEndIndex = leftWindowEndIndex;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) {
+ this.rightWindowEndIndex = rightWindowEndIndex;
+ return this;
+ }
+
+ public WindowJoinOperator build() {
+ checkNotNull(leftType);
+ checkNotNull(rightType);
+ checkNotNull(generatedJoinCondition);
+ checkNotNull(filterNullKeys);
+ checkNotNull(joinType);
+
+ checkArgument(
+ leftWindowEndIndex >= 0,
+ String.format(
+ "Illegal window end index %s, it should not be negative!",
+ leftWindowEndIndex));
+ checkArgument(
+ rightWindowEndIndex >= 0,
+ String.format(
+ "Illegal window end index %s, it should not be negative!",
+ rightWindowEndIndex));
+
+ switch (joinType) {
+ case INNER:
+ return new WindowJoinOperator.InnerJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ case SEMI:
+ return new WindowJoinOperator.SemiAntiJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ false);
+ case ANTI:
+ return new WindowJoinOperator.SemiAntiJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ true);
+ case LEFT:
+ return new WindowJoinOperator.LeftOuterJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ case RIGHT:
+ return new WindowJoinOperator.RightOuterJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ case FULL:
+ return new WindowJoinOperator.FullOuterJoinOperator(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys);
+ default:
+ throw new IllegalArgumentException("Invalid join type: " + joinType);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
new file mode 100644
index 00000000000000..afeb2911ab5c3e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** A wrapper of {@link ListState} which is easier to update based on window namespace. */
+public final class WindowListState implements WindowState {
+
+ private final InternalListState windowState;
+
+ public WindowListState(InternalListState windowState) {
+ this.windowState = windowState;
+ }
+
+ public void clear(W window) {
+ windowState.setCurrentNamespace(window);
+ windowState.clear();
+ }
+
+ public List get(W window) throws Exception {
+ windowState.setCurrentNamespace(window);
+ return windowState.getInternal();
+ }
+
+ /**
+ * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the
+ * list of values. The next time {@link #get(W)} is called (for the same state partition) the
+ * returned state will represent the updated list.
+ *
+ *
If null is passed in, the state value will remain unchanged.
+ *
+ * @param window The namespace for the state.
+ * @param value The new value for the state.
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void add(W window, RowData value) throws Exception {
+ windowState.setCurrentNamespace(window);
+ windowState.add(value);
+ }
+
+ /**
+ * Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the
+ * list of values. The next time {@link #get(W)} is called (for the same state partition) the
+ * returned state will represent the updated list.
+ *
+ *
If null is passed in, the state value will remain unchanged.
+ *
+ *
The performance is not well, first get complete list by calling {@link
+ * InternalListState#getInternal()})}, then remove the value from list, finally update state by
+ * calling {@link InternalListState#update(List)}.
+ *
+ * @param window The namespace for the state.
+ * @param value The new value for the state.
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public boolean delete(W window, RowData value) throws Exception {
+ List completeData = get(window);
+ boolean flag = completeData.remove(value);
+ windowState.updateInternal(completeData);
+ return flag;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
new file mode 100644
index 00000000000000..c1b4f0d81474b9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */
+public class WindowJoinOperatorTest {
+
+ private InternalTypeInfo rowType =
+ InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+
+ private InternalTypeInfo outputRowType =
+ InternalTypeInfo.ofFields(
+ new BigIntType(),
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new BigIntType(),
+ new VarCharType(VarCharType.MAX_LENGTH));
+ private RowDataHarnessAssertor assertor =
+ new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
+
+ private RowDataHarnessAssertor semiAntiJoinAssertor =
+ new RowDataHarnessAssertor(rowType.toRowFieldTypes());
+
+ private String funcCode =
+ "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction "
+ + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n"
+ + "\n"
+ + " public TestWindowJoinCondition(Object[] reference) {\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n"
+ + " return true;\n"
+ + " }\n"
+ + "}\n";
+
+ private GeneratedJoinCondition joinFunction =
+ new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]);
+
+ private int keyIdx = 1;
+ private RowDataKeySelector keySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {keyIdx}, rowType.toRowFieldTypes());
+ private TypeInformation keyType = InternalTypeInfo.ofFields();
+
+ @Test
+ public void testSemiJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness testHarness =
+ createTestHarness(FlinkJoinType.SEMI);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(1L, "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(3L, "k1"));
+ testHarness.processElement1(insertRecord(3L, "k1"));
+ testHarness.processElement2(insertRecord(3L, "k1"));
+ testHarness.processElement1(insertRecord(6L, "k1"));
+ testHarness.processElement2(insertRecord(9L, "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List