Skip to content

Commit

Permalink
[FLINK-19606][table-runtime-blink] Refactor utility class JoinConditi…
Browse files Browse the repository at this point in the history
…onWithFullFilters from AbstractStreamingJoinOperator

This closes apache#15752
  • Loading branch information
beyond1920 authored and TheodoreLx committed Apr 28, 2021
1 parent e419375 commit c24b8ec
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.join;

import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NullAwareGetters;
import org.apache.flink.table.runtime.generated.JoinCondition;

/** Utility to take null filters into consideration when apply join condition. */
public class JoinConditionWithNullFilters extends WrappingFunction<JoinCondition>
implements JoinCondition {

private static final long serialVersionUID = 1L;

/** Should filter null keys. */
private final int[] nullFilterKeys;

/** No keys need to filter null. */
private final boolean nullSafe;

/** Filter null to all keys. */
private final boolean filterAllNulls;

private final KeyContext keyContext;

public JoinConditionWithNullFilters(
JoinCondition backingJoinCondition, boolean[] filterNullKeys, KeyContext keyContext) {
super(backingJoinCondition);
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys);
this.nullSafe = nullFilterKeys.length == 0;
this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length;
this.keyContext = keyContext;
}

@Override
public boolean apply(RowData left, RowData right) {
if (!nullSafe) { // is not null safe, return false if any null exists
// key is always BinaryRowData
NullAwareGetters joinKey = (NullAwareGetters) keyContext.getCurrentKey();
if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) {
// find null present, return false directly
return false;
}
}
// test condition
return wrappedFunction.apply(left, right);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

package org.apache.flink.table.runtime.operators.join.stream;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NullAwareGetters;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.join.NullAwareJoinHelper;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateView;
Expand Down Expand Up @@ -60,14 +58,7 @@ public abstract class AbstractStreamingJoinOperator extends AbstractStreamOperat
protected final JoinInputSideSpec leftInputSideSpec;
protected final JoinInputSideSpec rightInputSideSpec;

/** Should filter null keys. */
private final int[] nullFilterKeys;

/** No keys need to filter null. */
private final boolean nullSafe;

/** Filter null to all keys. */
private final boolean filterAllNulls;
private final boolean[] filterNullKeys;

protected final long stateRetentionTime;

Expand All @@ -88,21 +79,17 @@ public AbstractStreamingJoinOperator(
this.leftInputSideSpec = leftInputSideSpec;
this.rightInputSideSpec = rightInputSideSpec;
this.stateRetentionTime = stateRetentionTime;
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys);
this.nullSafe = nullFilterKeys.length == 0;
this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length;
this.filterNullKeys = filterNullKeys;
}

@Override
public void open() throws Exception {
super.open();

JoinCondition condition =
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
condition.setRuntimeContext(getRuntimeContext());
condition.open(new Configuration());

this.joinCondition = new JoinConditionWithNullFilters(condition);
this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this);
this.joinCondition.setRuntimeContext(getRuntimeContext());
this.joinCondition.open(new Configuration());

this.collector = new TimestampedCollector<>(output);
}
Expand All @@ -111,35 +98,7 @@ public void open() throws Exception {
public void close() throws Exception {
super.close();
if (joinCondition != null) {
joinCondition.backingJoinCondition.close();
}
}

// ----------------------------------------------------------------------------------------
// Utility Classes
// ----------------------------------------------------------------------------------------

private class JoinConditionWithNullFilters extends AbstractRichFunction
implements JoinCondition {

final JoinCondition backingJoinCondition;

private JoinConditionWithNullFilters(JoinCondition backingJoinCondition) {
this.backingJoinCondition = backingJoinCondition;
}

@Override
public boolean apply(RowData left, RowData right) {
if (!nullSafe) { // is not null safe, return false if any null exists
// key is always BinaryRowData
NullAwareGetters joinKey = (NullAwareGetters) getCurrentKey();
if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) {
// find null present, return false directly
return false;
}
}
// test condition
return backingJoinCondition.apply(left, right);
joinCondition.close();
}
}

Expand Down

0 comments on commit c24b8ec

Please sign in to comment.