Skip to content

Commit

Permalink
Add input distribution to delete node
Browse files Browse the repository at this point in the history
  • Loading branch information
feilong-liu committed Dec 2, 2024
1 parent 6306c99 commit 2d6435b
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public DeleteNode plan(Delete node)
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("fragment", VARBINARY));

return new DeleteNode(getSourceLocation(node), idAllocator.getNextId(), builder.getRoot(), rowId, deleteNodeOutputVariables);
return new DeleteNode(getSourceLocation(node), idAllocator.getNextId(), builder.getRoot(), rowId, deleteNodeOutputVariables, Optional.empty());
}

public UpdateNode plan(Update node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
Expand Down Expand Up @@ -528,6 +529,43 @@ public PlanWithProperties visitTopN(TopNNode node, PreferredProperties preferred
return rebaseAndDeriveProperties(node, child);
}

@Override
public PlanWithProperties visitDelete(DeleteNode node, PreferredProperties preferredProperties)
{
if (!node.getInputDistribution().isPresent()) {
return visitPlan(node, preferredProperties);
}
DeleteNode.InputDistribution inputDistribution = node.getInputDistribution().get();
List<LocalProperty<VariableReferenceExpression>> desiredProperties = new ArrayList<>();
if (!inputDistribution.getPartitionBy().isEmpty()) {
desiredProperties.add(new GroupingProperty<>(inputDistribution.getPartitionBy()));
}
inputDistribution.getOrderingScheme().ifPresent(orderingScheme ->
orderingScheme.getOrderByVariables().stream()
.map(variable -> new SortingProperty<>(variable, orderingScheme.getOrdering(variable)))
.forEach(desiredProperties::add));

PlanWithProperties child = planChild(
node,
PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(inputDistribution.getPartitionBy()), desiredProperties)
.mergeWithParent(preferredProperties, !isExactPartitioningPreferred(session)));

if (!isStreamPartitionedOn(child.getProperties(), inputDistribution.getPartitionBy()) &&
!isNodePartitionedOn(child.getProperties(), inputDistribution.getPartitionBy())) {
checkState(!inputDistribution.getPartitionBy().isEmpty());
child = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false),
child.getNode(),
createPartitioning(inputDistribution.getPartitionBy()),
Optional.empty()),
child.getProperties());
}

return rebaseAndDeriveProperties(node, child);
}

@Override
public PlanWithProperties visitSort(SortNode node, PreferredProperties preferredProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.JoinNode;
Expand Down Expand Up @@ -459,6 +460,31 @@ public PlanWithProperties visitWindow(WindowNode node, StreamPreferredProperties
return deriveProperties(result, child.getProperties());
}

@Override
public PlanWithProperties visitDelete(DeleteNode node, StreamPreferredProperties parentPreferences)
{
if (!node.getInputDistribution().isPresent()) {
return visitPlan(node, parentPreferences);
}
DeleteNode.InputDistribution inputDistribution = node.getInputDistribution().get();
StreamPreferredProperties childRequirements = parentPreferences
.constrainTo(node.getSource().getOutputVariables())
.withDefaultParallelism(session)
.withPartitioning(inputDistribution.getPartitionBy());

PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);
DeleteNode result = new DeleteNode(
node.getSourceLocation(),
idAllocator.getNextId(),
node.getStatsEquivalentPlanNode(),
child.getNode(),
node.getRowId(),
node.getOutputVariables(),
node.getInputDistribution());

return deriveProperties(result, child.getProperties());
}

@Override
public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, StreamPreferredProperties parentPreferences)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,13 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Set<Variab
@Override
public PlanNode visitDelete(DeleteNode node, RewriteContext<Set<VariableReferenceExpression>> context)
{
PlanNode source = context.rewrite(node.getSource(), ImmutableSet.of(node.getRowId()));
return new DeleteNode(node.getSourceLocation(), node.getId(), node.getStatsEquivalentPlanNode(), source, node.getRowId(), node.getOutputVariables());
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
builder.add(node.getRowId());
if (node.getInputDistribution().isPresent()) {
builder.addAll(node.getInputDistribution().get().getInputVariables());
}
PlanNode source = context.rewrite(node.getSource(), builder.build());
return new DeleteNode(node.getSourceLocation(), node.getId(), node.getStatsEquivalentPlanNode(), source, node.getRowId(), node.getOutputVariables(), node.getInputDistribution());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext<Void> context)
node.getId(),
rewrittenSource,
node.getRowId(),
node.getOutputVariables());
node.getOutputVariables(),
node.getInputDistribution());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public PlanNode visitValues(ValuesNode node, RewriteContext<Void> context)
@Override
public PlanNode visitDelete(DeleteNode node, RewriteContext<Void> context)
{
return new DeleteNode(node.getSourceLocation(), node.getId(), context.rewrite(node.getSource()), canonicalize(node.getRowId()), node.getOutputVariables());
return new DeleteNode(node.getSourceLocation(), node.getId(), context.rewrite(node.getSource()), canonicalize(node.getRowId()), node.getOutputVariables(), node.getInputDistribution());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ public TableFinishNode tableDelete(SchemaTableName schemaTableName, PlanNode del
idAllocator.getNextId(),
deleteSource,
deleteRowId,
ImmutableList.of(deleteRowId)))
ImmutableList.of(deleteRowId),
Optional.empty()))
.addInputsSet(deleteRowId)
.singleDistributionPartitioningScheme(deleteRowId)),
Optional.of(deleteHandle),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ public final class DeleteNode
private final PlanNode source;
private final VariableReferenceExpression rowId;
private final List<VariableReferenceExpression> outputVariables;
private final Optional<InputDistribution> inputDistribution;

@JsonCreator
public DeleteNode(
Optional<SourceLocation> sourceLocation,
@JsonProperty("id") PlanNodeId id,
@JsonProperty("source") PlanNode source,
@JsonProperty("rowId") VariableReferenceExpression rowId,
@JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables)
@JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables,
@JsonProperty("inputDistribution") Optional<InputDistribution> inputDistribution)
{
this(sourceLocation, id, Optional.empty(), source, rowId, outputVariables);
this(sourceLocation, id, Optional.empty(), source, rowId, outputVariables, inputDistribution);
}

public DeleteNode(
Expand All @@ -53,13 +55,15 @@ public DeleteNode(
Optional<PlanNode> statsEquivalentPlanNode,
PlanNode source,
VariableReferenceExpression rowId,
List<VariableReferenceExpression> outputVariables)
List<VariableReferenceExpression> outputVariables,
Optional<InputDistribution> inputDistribution)
{
super(sourceLocation, id, statsEquivalentPlanNode);

this.source = requireNonNull(source, "source is null");
this.rowId = requireNonNull(rowId, "rowId is null");
this.outputVariables = Collections.unmodifiableList(new ArrayList<>(requireNonNull(outputVariables, "outputVariables is null")));
this.inputDistribution = requireNonNull(inputDistribution, "dataPartition is null");
}

@JsonProperty
Expand All @@ -81,6 +85,12 @@ public List<VariableReferenceExpression> getOutputVariables()
return outputVariables;
}

@JsonProperty
public Optional<InputDistribution> getInputDistribution()
{
return inputDistribution;
}

@Override
public List<PlanNode> getSources()
{
Expand All @@ -97,12 +107,30 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
checkArgument(newChildren.size() == 1);
return new DeleteNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), newChildren.get(0), rowId, outputVariables);
return new DeleteNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), newChildren.get(0), rowId, outputVariables, inputDistribution);
}

@Override
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
{
return new DeleteNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, rowId, outputVariables);
return new DeleteNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, rowId, outputVariables, inputDistribution);
}

public interface InputDistribution
{
default List<VariableReferenceExpression> getPartitionBy()
{
return Collections.emptyList();
}

default Optional<OrderingScheme> getOrderingScheme()
{
return Optional.empty();
}

default List<VariableReferenceExpression> getInputVariables()
{
return Collections.emptyList();
}
}
}

0 comments on commit 2d6435b

Please sign in to comment.