Skip to content

Commit

Permalink
Expose Filter push down support
Browse files Browse the repository at this point in the history
  • Loading branch information
Arunachalam Thirupathi committed Nov 2, 2022
1 parent cb986f6 commit 96180d8
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,6 @@ public static DynamicFilterExtractResult extractDynamicFilters(RowExpression exp
return new DynamicFilterExtractResult(staticConjuncts.build(), dynamicConjuncts.build());
}

public static RowExpression extractDynamicConjuncts(List<RowExpression> conjuncts, LogicalRowExpressions logicalRowExpressions)
{
ImmutableList.Builder<RowExpression> dynamicConjuncts = ImmutableList.builder();
for (RowExpression conjunct : conjuncts) {
Optional<DynamicFilterPlaceholder> placeholder = getPlaceholder(conjunct);
if (placeholder.isPresent()) {
dynamicConjuncts.add(conjunct);
}
}

return logicalRowExpressions.combineConjuncts(dynamicConjuncts.build());
}

public static RowExpression extractStaticConjuncts(List<RowExpression> conjuncts, LogicalRowExpressions logicalRowExpressions)
{
ImmutableList.Builder<RowExpression> staticConjuncts = ImmutableList.builder();
for (RowExpression conjunct : conjuncts) {
Optional<DynamicFilterPlaceholder> placeholder = getPlaceholder(conjunct);
if (!placeholder.isPresent()) {
staticConjuncts.add(conjunct);
}
}

return logicalRowExpressions.combineConjuncts(staticConjuncts.build());
}

public static boolean isDynamicFilter(RowExpression expression)
{
return getPlaceholder(expression).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableList;
Expand All @@ -77,8 +76,7 @@
import java.util.function.Function;

import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.expressions.DynamicFilters.extractDynamicConjuncts;
import static com.facebook.presto.expressions.DynamicFilters.extractStaticConjuncts;
import static com.facebook.presto.expressions.DynamicFilters.isDynamicFilter;
import static com.facebook.presto.expressions.DynamicFilters.removeNestedDynamicFilters;
import static com.facebook.presto.expressions.LogicalRowExpressions.FALSE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
Expand All @@ -92,6 +90,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.facebook.presto.spi.StandardErrorCode.DIVISION_BY_ZERO;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_CAST_ARGUMENT;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static com.facebook.presto.spi.StandardErrorCode.NUMERIC_VALUE_OUT_OF_RANGE;
Expand Down Expand Up @@ -153,6 +152,24 @@ public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, Variable
return rewriteWith(new Rewriter(session, idAllocator), maxSubplan);
}

/**
* If this method returns true, the expression will not be pushed inside the TableScan node.
* This is exposed for dynamic or computed column functionality support.
* Consider the following case:
* Read a base row from the file. Modify the row to add a new column or update an existing column
* all inside the TableScan. If filters are pushed down inside the TableScan, it would try to apply
* it on base row. In these cases, override this method and return true. This will prevent the
* expression from being pushed into the TableScan but will wrap the TableScanNode in a FilterNode.
* @param expression expression to be evaluated.
* @param tableHandle tableHandler where the expression to be evaluated.
* @param columnHandleMap column name to column handle Map for all columns in the table.
* @return true, if this expression should not be pushed inside the table scan, else false.
*/
protected boolean useDynamicFilter(RowExpression expression, ConnectorTableHandle tableHandle, Map<String, ColumnHandle> columnHandleMap)
{
return false;
}

protected ConnectorPushdownFilterResult pushdownFilter(
ConnectorSession session,
HiveMetadata metadata,
Expand All @@ -164,24 +181,15 @@ protected ConnectorPushdownFilterResult pushdownFilter(
session,
metadata,
metadata.getMetastore(),
rowExpressionService,
functionResolution,
partitionManager,
functionMetadataManager,
tableHandle,
filter,
currentLayoutHandle);
}

@VisibleForTesting
public static ConnectorPushdownFilterResult pushdownFilter(
public ConnectorPushdownFilterResult pushdownFilter(
ConnectorSession session,
ConnectorMetadata metadata,
SemiTransactionalHiveMetastore metastore,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
HivePartitionManager partitionManager,
FunctionMetadataManager functionMetadataManager,
ConnectorTableHandle tableHandle,
RowExpression filter,
Optional<ConnectorTableLayoutHandle> currentLayoutHandle)
Expand Down Expand Up @@ -252,7 +260,7 @@ public static ConnectorPushdownFilterResult pushdownFilter(
.forEach(predicateColumnNames::add);
// Include only columns referenced in the optimized expression. Although the expression is sent to the worker node
// unoptimized, the worker is expected to optimize the expression before executing.
extractAll(optimizedRemainingExpression).stream()
extractVariableExpressions(optimizedRemainingExpression).stream()
.map(VariableReferenceExpression::getName)
.forEach(predicateColumnNames::add);

Expand All @@ -265,8 +273,18 @@ public static ConnectorPushdownFilterResult pushdownFilter(

LogicalRowExpressions logicalRowExpressions = new LogicalRowExpressions(rowExpressionService.getDeterminismEvaluator(), functionResolution, functionMetadataManager);
List<RowExpression> conjuncts = extractConjuncts(decomposedFilter.getRemainingExpression());
RowExpression dynamicFilterExpression = extractDynamicConjuncts(conjuncts, logicalRowExpressions);
RowExpression remainingExpression = extractStaticConjuncts(conjuncts, logicalRowExpressions);
ImmutableList.Builder<RowExpression> dynamicConjuncts = ImmutableList.builder();
ImmutableList.Builder<RowExpression> staticConjuncts = ImmutableList.builder();
for (RowExpression conjunct : conjuncts) {
if (isDynamicFilter(conjunct) || useDynamicFilter(conjunct, tableHandle, columnHandles)) {
dynamicConjuncts.add(conjunct);
}
else {
staticConjuncts.add(conjunct);
}
}
RowExpression dynamicFilterExpression = logicalRowExpressions.combineConjuncts(dynamicConjuncts.build());
RowExpression remainingExpression = logicalRowExpressions.combineConjuncts(staticConjuncts.build());
remainingExpression = removeNestedDynamicFilters(remainingExpression);

Table table = metastore.getTable(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), tableName.getSchemaName(), tableName.getTableName())
Expand Down Expand Up @@ -414,7 +432,7 @@ public PlanNode visitTableScan(TableScanNode tableScan, RewriteContext<Void> con
return new ValuesNode(tableScan.getSourceLocation(), idAllocator.getNextId(), tableScan.getOutputVariables(), ImmutableList.of(), Optional.of(tableScan.getTable().getConnectorHandle().toString()));
}

return new TableScanNode(
TableScanNode node = new TableScanNode(
tableScan.getSourceLocation(),
tableScan.getId(),
new TableHandle(handle.getConnectorId(), handle.getConnectorHandle(), handle.getTransaction(), Optional.of(pushdownFilterResult.getLayout().getHandle())),
Expand All @@ -423,6 +441,12 @@ public PlanNode visitTableScan(TableScanNode tableScan, RewriteContext<Void> con
tableScan.getTableConstraints(),
pushdownFilterResult.getLayout().getPredicate(),
TupleDomain.all());

RowExpression unenforcedFilter = pushdownFilterResult.getUnenforcedConstraint();
if (!TRUE_CONSTANT.equals(unenforcedFilter)) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Unenforced filter found %s but not handled", unenforcedFilter));
}
return node;
}
}

Expand All @@ -441,7 +465,7 @@ public ConstraintEvaluator(RowExpressionService evaluator, ConnectorSession sess
this.session = session;
this.expression = expression;

arguments = ImmutableSet.copyOf(extractAll(expression)).stream()
arguments = ImmutableSet.copyOf(extractVariableExpressions(expression)).stream()
.map(VariableReferenceExpression::getName)
.map(assignments::get)
.collect(toImmutableSet());
Expand Down Expand Up @@ -506,7 +530,7 @@ private static String getColumnName(ConnectorSession session, HiveMetadata metad
return metadata.getColumnMetadata(session, tableHandle, columnHandle).getName();
}

private boolean isPushdownFilterSupported(ConnectorSession session, TableHandle tableHandle)
protected boolean isPushdownFilterSupported(ConnectorSession session, TableHandle tableHandle)
{
checkArgument(tableHandle.getConnectorHandle() instanceof HiveTableHandle, "pushdownFilter is never supported on a non-hive TableHandle");
if (((HiveTableHandle) tableHandle.getConnectorHandle()).getAnalyzePartitionValues().isPresent()) {
Expand Down Expand Up @@ -568,7 +592,7 @@ private static String createTableLayoutString(
.toString();
}

private static Set<VariableReferenceExpression> extractAll(RowExpression expression)
public static Set<VariableReferenceExpression> extractVariableExpressions(RowExpression expression)
{
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
expression.accept(new VariableReferenceBuilderVisitor(), builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.facebook.presto.hive.pagefile.PageFilePageSource;
import com.facebook.presto.hive.parquet.ParquetPageSource;
import com.facebook.presto.hive.rcfile.RcFilePageSource;
import com.facebook.presto.hive.rule.HiveFilterPushdown;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
Expand Down Expand Up @@ -105,9 +106,12 @@
import com.facebook.presto.spi.constraints.PrimaryKeyConstraint;
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.constraints.UniqueConstraint;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.facebook.presto.spi.security.PrestoPrincipal;
Expand Down Expand Up @@ -278,7 +282,6 @@
import static com.facebook.presto.hive.metastore.NoopMetastoreCacheStats.NOOP_METASTORE_CACHE_STATS;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.rule.HiveFilterPushdown.pushdownFilter;
import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
Expand Down Expand Up @@ -2289,6 +2292,22 @@ private void doTestBucketedTableEvolution(HiveStorageFormat storageFormat, Schem
}
}

private static HiveFilterPushdown.ConnectorPushdownFilterResult pushdownFilter(
ConnectorSession session,
ConnectorMetadata metadata,
SemiTransactionalHiveMetastore metastore,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
HivePartitionManager partitionManager,
FunctionMetadataManager functionMetadataManager,
ConnectorTableHandle tableHandle,
RowExpression filter,
Optional<ConnectorTableLayoutHandle> currentLayoutHandle)
{
HiveFilterPushdown filterPushdown = new HiveFilterPushdown(new HiveTransactionManager(), rowExpressionService, functionResolution, partitionManager, functionMetadataManager);
return filterPushdown.pushdownFilter(session, metadata, metastore, tableHandle, filter, currentLayoutHandle);
}

private static void assertBucketTableEvolutionResult(MaterializedResult result, List<ColumnHandle> columnHandles, Set<Integer> bucketIds, int rowCount)
{
// Assert that only elements in the specified bucket shows up, and each element shows up 3 times.
Expand Down

0 comments on commit 96180d8

Please sign in to comment.