From c24b8ec371500bc421ada77c9b3702125b0c4c9c Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 27 Apr 2021 09:39:38 +0800 Subject: [PATCH] [FLINK-19606][table-runtime-blink] Refactor utility class JoinConditionWithFullFilters from AbstractStreamingJoinOperator This closes #15752 --- .../join/JoinConditionWithNullFilters.java | 66 +++++++++++++++++++ .../stream/AbstractStreamingJoinOperator.java | 55 ++-------------- 2 files changed, 73 insertions(+), 48 deletions(-) create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/JoinConditionWithNullFilters.java diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/JoinConditionWithNullFilters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/JoinConditionWithNullFilters.java new file mode 100644 index 0000000000000..63b9c461e1bfd --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/JoinConditionWithNullFilters.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join; + +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.NullAwareGetters; +import org.apache.flink.table.runtime.generated.JoinCondition; + +/** Utility to take null filters into consideration when apply join condition. */ +public class JoinConditionWithNullFilters extends WrappingFunction + implements JoinCondition { + + private static final long serialVersionUID = 1L; + + /** Should filter null keys. */ + private final int[] nullFilterKeys; + + /** No keys need to filter null. */ + private final boolean nullSafe; + + /** Filter null to all keys. */ + private final boolean filterAllNulls; + + private final KeyContext keyContext; + + public JoinConditionWithNullFilters( + JoinCondition backingJoinCondition, boolean[] filterNullKeys, KeyContext keyContext) { + super(backingJoinCondition); + this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys); + this.nullSafe = nullFilterKeys.length == 0; + this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length; + this.keyContext = keyContext; + } + + @Override + public boolean apply(RowData left, RowData right) { + if (!nullSafe) { // is not null safe, return false if any null exists + // key is always BinaryRowData + NullAwareGetters joinKey = (NullAwareGetters) keyContext.getCurrentKey(); + if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) { + // find null present, return false directly + return false; + } + } + // test condition + return wrappedFunction.apply(left, right); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java index b221950adb86e..64ada0f0db406 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java @@ -18,17 +18,15 @@ package org.apache.flink.table.runtime.operators.join.stream; -import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.NullAwareGetters; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.generated.JoinCondition; -import org.apache.flink.table.runtime.operators.join.NullAwareJoinHelper; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView; import org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateView; @@ -60,14 +58,7 @@ public abstract class AbstractStreamingJoinOperator extends AbstractStreamOperat protected final JoinInputSideSpec leftInputSideSpec; protected final JoinInputSideSpec rightInputSideSpec; - /** Should filter null keys. */ - private final int[] nullFilterKeys; - - /** No keys need to filter null. */ - private final boolean nullSafe; - - /** Filter null to all keys. */ - private final boolean filterAllNulls; + private final boolean[] filterNullKeys; protected final long stateRetentionTime; @@ -88,21 +79,17 @@ public AbstractStreamingJoinOperator( this.leftInputSideSpec = leftInputSideSpec; this.rightInputSideSpec = rightInputSideSpec; this.stateRetentionTime = stateRetentionTime; - this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys); - this.nullSafe = nullFilterKeys.length == 0; - this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length; + this.filterNullKeys = filterNullKeys; } @Override public void open() throws Exception { super.open(); - JoinCondition condition = generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); - condition.setRuntimeContext(getRuntimeContext()); - condition.open(new Configuration()); - - this.joinCondition = new JoinConditionWithNullFilters(condition); + this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this); + this.joinCondition.setRuntimeContext(getRuntimeContext()); + this.joinCondition.open(new Configuration()); this.collector = new TimestampedCollector<>(output); } @@ -111,35 +98,7 @@ public void open() throws Exception { public void close() throws Exception { super.close(); if (joinCondition != null) { - joinCondition.backingJoinCondition.close(); - } - } - - // ---------------------------------------------------------------------------------------- - // Utility Classes - // ---------------------------------------------------------------------------------------- - - private class JoinConditionWithNullFilters extends AbstractRichFunction - implements JoinCondition { - - final JoinCondition backingJoinCondition; - - private JoinConditionWithNullFilters(JoinCondition backingJoinCondition) { - this.backingJoinCondition = backingJoinCondition; - } - - @Override - public boolean apply(RowData left, RowData right) { - if (!nullSafe) { // is not null safe, return false if any null exists - // key is always BinaryRowData - NullAwareGetters joinKey = (NullAwareGetters) getCurrentKey(); - if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) { - // find null present, return false directly - return false; - } - } - // test condition - return backingJoinCondition.apply(left, right); + joinCondition.close(); } }