Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enhanced cte scheduling mode #24108

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
oldTableScanNode.getAssignments(),
oldTableScanNode.getTableConstraints(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint());
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getCteMaterializationInfo());

return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newTableScanNode, node.getPredicate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan)
ImmutableList.copyOf(assignments.keySet()),
assignments.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, (e) -> (ColumnHandle) (e.getValue()))),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
tableScanNode.getEnforcedConstraint(),
tableScanNode.getCteMaterializationInfo()));
}

@Override
Expand Down Expand Up @@ -288,7 +289,8 @@ public PlanNode visitFilter(FilterNode node, Void context)
oldTableScanNode.getOutputVariables(),
oldTableScanNode.getAssignments(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint());
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getCteMaterializationInfo());

return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newTableScanNode, node.getPredicate());
}
Expand Down
12 changes: 11 additions & 1 deletion presto-docs/src/main/sphinx/admin/cte-materialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ This setting specifies the Hash function type for CTE materialization.

Use the ``hive.bucket_function_type_for_cte_materialization`` session property to set on a per-query basis.


``query.max-written-intermediate-bytes``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -129,6 +128,17 @@ This setting defines a cap on the amount of data that can be written during CTE

Use the ``query_max_written_intermediate_bytes`` session property to set on a per-query basis.

``enhanced-cte-scheduling-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

Flag to enable or disable the enhanced-cte-blocking during CTE Materialization. Enhanced CTE blocking restricts only the table scan stages of the CTE TableScan, rather than blocking entire plan sections, including the main query, until the query completes.
This approach can improve latency in scenarios where parts of the query can execute concurrently with CTE materialization writes.

Use the ``enhanced_cte_scheduling_enabled`` session property to set on a per-query basis.


How to Participate in Development
---------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan)
assignments.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, (e) -> (ColumnHandle) (e.getValue()))),
tableScanNode.getTableConstraints(),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
tableScanNode.getEnforcedConstraint(),
tableScanNode.getCteMaterializationInfo()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected TableScanNode tableScan(PlanBuilder planBuilder, DruidTableHandle conn
variables,
assignments.build(),
TupleDomain.all(),
TupleDomain.all());
TupleDomain.all(), Optional.empty());
}

protected FilterNode filter(PlanBuilder planBuilder, PlanNode source, RowExpression predicate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ private static TableScanNode getTableScanNode(
tableScan.getAssignments(),
tableScan.getTableConstraints(),
pushdownFilterResult.getLayout().getPredicate(),
TupleDomain.all());
TupleDomain.all(),
tableScan.getCteMaterializationInfo());
}

private static ExtractionResult intersectExtractionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public PlanNode visitTableScan(TableScanNode tableScan, RewriteContext<Void> con
tableScan.getAssignments(),
tableScan.getTableConstraints(),
tableScan.getCurrentConstraint(),
tableScan.getEnforcedConstraint());
tableScan.getEnforcedConstraint(),
tableScan.getCteMaterializationInfo());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ private Optional<PlanNode> tryPartialAggregationPushdown(PlanNode plan)
ImmutableMap.copyOf(assignments),
oldTableScanNode.getTableConstraints(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint()));
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getCteMaterializationInfo()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,15 @@ public void testChainedCteProjectionAndFilterPushDown()
generateMaterializedCTEInformation("cte5", 1, false, true)));
}

@Test
public void testCTEMaterializationWithEnhancedScheduling()
{
QueryRunner queryRunner = getQueryRunner();
String sql = "WITH temp as (SELECT orderkey FROM ORDERS) " +
"SELECT * FROM temp t1 JOIN (SELECT custkey FROM customer) c ON t1.orderkey=c.custkey";
verifyResults(queryRunner, sql, ImmutableList.of(generateMaterializedCTEInformation("temp", 1, false, true)));
}

@Test
public void testWrittenIntemediateByteLimit()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ private TableScanNode createDeletesTableScan(ImmutableMap<VariableReferenceExpre
outputs,
deleteColumnAssignments,
TupleDomain.all(),
TupleDomain.all());
TupleDomain.all(),
Optional.empty());
}

/**
Expand Down Expand Up @@ -382,7 +383,8 @@ private TableScanNode createNewRoot(TableScanNode node, IcebergTableHandle icebe
assignmentsBuilder.build(),
node.getTableConstraints(),
node.getCurrentConstraint(),
node.getEnforcedConstraint());
node.getEnforcedConstraint(),
node.getCteMaterializationInfo());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
.intersect(tableScan.getCurrentConstraint()),
predicateNotChangedBySimplification ?
identityPartitionColumnPredicate.intersect(tableScan.getEnforcedConstraint()) :
tableScan.getEnforcedConstraint());
tableScan.getEnforcedConstraint(),
tableScan.getCteMaterializationInfo());

if (TRUE_CONSTANT.equals(remainingFilterExpression) && predicateNotChangedBySimplification) {
return newTableScan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public final class SystemSessionProperties
public static final String QUERY_RETRY_MAX_EXECUTION_TIME = "query_retry_max_execution_time";
public static final String PARTIAL_RESULTS_ENABLED = "partial_results_enabled";
public static final String PARTIAL_RESULTS_COMPLETION_RATIO_THRESHOLD = "partial_results_completion_ratio_threshold";
public static final String ENHANCED_CTE_SCHEDULING_ENABLED = "enhanced-cte-scheduling-enabled";
public static final String PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER = "partial_results_max_execution_time_multiplier";
public static final String OFFSET_CLAUSE_ENABLED = "offset_clause_enabled";
public static final String VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED = "verbose_exceeded_memory_limit_errors_enabled";
Expand Down Expand Up @@ -1282,6 +1283,11 @@ public SystemSessionProperties(
"Minimum query completion ratio threshold for partial results",
featuresConfig.getPartialResultsCompletionRatioThreshold(),
false),
booleanProperty(
ENHANCED_CTE_SCHEDULING_ENABLED,
"Applicable for CTE Materialization. If enabled, only tablescans of the pending tablewriters are blocked and other stages can continue.",
featuresConfig.getEnhancedCTESchedulingEnabled(),
true),
booleanProperty(
OFFSET_CLAUSE_ENABLED,
"Enable support for OFFSET clause",
Expand Down Expand Up @@ -2690,6 +2696,11 @@ public static double getPartialResultsCompletionRatioThreshold(Session session)
return session.getSystemProperty(PARTIAL_RESULTS_COMPLETION_RATIO_THRESHOLD, Double.class);
}

public static boolean isEnhancedCTESchedulingEnabled(Session session)
{
return isCteMaterializationApplicable(session) & session.getSystemProperty(ENHANCED_CTE_SCHEDULING_ENABLED, Boolean.class);
}

public static double getPartialResultsMaxExecutionTimeMultiplier(Session session)
{
return session.getSystemProperty(PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER, Double.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.remotetask.HttpRemoteTask;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.plan.CteMaterializationInfo;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -60,8 +65,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.getMaxFailedTaskPercentage;
import static com.facebook.presto.SystemSessionProperties.isEnhancedCTESchedulingEnabled;
import static com.facebook.presto.failureDetector.FailureDetector.State.GONE;
import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand Down Expand Up @@ -557,7 +564,6 @@ private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, M
// stage finished while we were scheduling this task
task.abort();
}

return task;
}

Expand Down Expand Up @@ -594,6 +600,59 @@ private static Split createRemoteSplitFor(TaskId taskId, URI remoteSourceTaskLoc
return new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit(new Location(splitLocation), remoteSourceTaskId));
}

private static String getCteIdFromSource(PlanNode source)
{
// Traverse the plan node tree to find a TableWriterNode with TemporaryTableInfo
return PlanNodeSearcher.searchFrom(source)
.where(planNode -> planNode instanceof TableFinishNode)
.findFirst()
.flatMap(planNode -> ((TableFinishNode) planNode).getCteMaterializationInfo())
.map(CteMaterializationInfo::getCteId)
.orElseThrow(() -> new IllegalStateException("TemporaryTableInfo has no CTE ID"));
}

public boolean isCTETableFinishStage()
{
return PlanNodeSearcher.searchFrom(planFragment.getRoot())
.where(planNode -> planNode instanceof TableFinishNode &&
((TableFinishNode) planNode).getCteMaterializationInfo().isPresent())
.findSingle()
.isPresent();
}

public String getCTEWriterId()
{
// Validate that this is a CTE TableFinish stage and return the associated CTE ID
if (!isCTETableFinishStage()) {
throw new IllegalStateException("This stage is not a CTE writer stage");
}
return getCteIdFromSource(planFragment.getRoot());
}

public boolean requiresMaterializedCTE()
{
if (!isEnhancedCTESchedulingEnabled(session)) {
return false;
}
// Search for TableScanNodes and check if they reference TemporaryTableInfo
return PlanNodeSearcher.searchFrom(planFragment.getRoot())
.where(planNode -> planNode instanceof TableScanNode)
.findAll().stream()
.anyMatch(planNode -> ((TableScanNode) planNode).getCteMaterializationInfo().isPresent());
}

public List<String> getRequiredCTEList()
{
// Collect all CTE IDs referenced by TableScanNodes with TemporaryTableInfo
return PlanNodeSearcher.searchFrom(planFragment.getRoot())
.where(planNode -> planNode instanceof TableScanNode)
.findAll().stream()
.map(planNode -> ((TableScanNode) planNode).getCteMaterializationInfo()
.orElseThrow(() -> new IllegalStateException("TableScanNode has no TemporaryTableInfo")))
.map(CteMaterializationInfo::getCteId)
.collect(Collectors.toList());
}

private void updateTaskStatus(TaskId taskId, TaskStatus taskStatus)
{
StageExecutionState stageExecutionState = getState();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/*
* Tracks the completion status of table-finish nodes that write temporary tables for CTE materialization.
* CTEMaterializationTracker manages a map of materialized CTEs and their associated materialization futures.
* When a stage includes a CTE table finish, it marks the corresponding CTE as materialized and completes
* the associated future.
* This signals the scheduler that some dependency has been resolved, prompting it to resume/continue scheduling.
*/
public class CTEMaterializationTracker
{
private final Map<String, SettableFuture<Void>> materializationFutures = new ConcurrentHashMap<>();

public ListenableFuture<Void> getFutureForCTE(String cteName)
{
return Futures.nonCancellationPropagating(
materializationFutures.compute(cteName, (key, existingFuture) -> {
if (existingFuture == null) {
// Create a new SettableFuture and store it internally
return SettableFuture.create();
}
Preconditions.checkArgument(!existingFuture.isCancelled(),
String.format("Error: Existing future was found cancelled in CTEMaterializationTracker for cte", cteName));
return existingFuture;
}));
}

public void markCTEAsMaterialized(String cteName)
{
materializationFutures.compute(cteName, (key, existingFuture) -> {
if (existingFuture == null) {
SettableFuture<Void> completedFuture = SettableFuture.create();
completedFuture.set(null);
return completedFuture;
}
Preconditions.checkArgument(!existingFuture.isCancelled(),
String.format("Error: Existing future was found cancelled in CTEMaterializationTracker for cte", cteName));
existingFuture.set(null); // Notify all listeners
return existingFuture;
});
}
}
Loading
Loading