Skip to content

Commit

Permalink
Fix writtenIntermediateBytes metric to capture correct intermediate w…
Browse files Browse the repository at this point in the history
…rite metrics
  • Loading branch information
jaystarshot committed Jan 23, 2024
1 parent 02cda05 commit 37ecfb2
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
Optional.empty(),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.TRUE)),
Optional.of(insertReference),
outputVar,
Optional.empty(),
Expand Down Expand Up @@ -348,7 +349,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty())),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
Expand All @@ -369,7 +371,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty());
Optional.empty(),
Optional.of(Boolean.TRUE));
}

return new TableFinishNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public Optional<PlanNode> visitTableWriter(TableWriterNode node, Context context
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
context.addPlan(node, new CanonicalPlan(result, strategy));
return Optional.of(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ private RelationPlan createTableWriterPlan(
// partial aggregation is run within the TableWriteOperator to calculate the statistics for
// the data consumed by the TableWriteOperator
Optional.of(aggregations.getPartialAggregation()),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
// final aggregation is run within the TableFinishOperator to summarize collected statistics
Expand Down Expand Up @@ -457,7 +458,8 @@ private RelationPlan createTableWriterPlan(
tablePartitioningScheme,
preferredShufflePartitioningScheme,
Optional.empty(),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class PlanFragment
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;

// Only true for output table writer and false for temporary table writers
private final boolean outputTableWriterFragment;
private final StatsAndCosts statsAndCosts;
private final Optional<String> jsonRepresentation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -81,7 +81,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
sqlParser,
idAllocator,
variableAllocator,
getTableWriterNodeIds(plan.getRoot()));
getOutputTableWriterNodeIds(plan.getRoot()));

FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ public static Set<PlanNodeId> getTableWriterNodeIds(PlanNode plan)
.collect(toImmutableSet());
}

public static Set<PlanNodeId> getOutputTableWriterNodeIds(PlanNode plan)
{
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
.filter(node -> node instanceof TableWriterNode)
.map(node -> (TableWriterNode) node)
.filter(tableWriterNode -> !tableWriterNode.getIsTemporaryTableWriter().orElse(false))
.map(TableWriterNode::getId)
.collect(toImmutableSet());
}

public static Optional<Integer> getTableWriterTasks(PlanNode plan)
{
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
rewrittenStatisticsAggregation,
node.getTaskCountIfScaledWriter()));
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter()));
}
return Result.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
Optional.of(initialTaskNumber)));
Optional.of(initialTaskNumber),
node.getIsTemporaryTableWriter()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
fixedParallelism(),
fixedParallelism());
}
Expand All @@ -603,7 +604,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
exchange.getProperties());
}
}
Expand Down Expand Up @@ -632,7 +634,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
exchange.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,8 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Set<Variab
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
node.getTaskCountIfScaledWriter());
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId new
node.getTablePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getPreferredShufflePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getStatisticsAggregation().map(this::map),
node.getTaskCountIfScaledWriter());
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter());
}

public StatisticsWriterNode map(StatisticsWriterNode node, PlanNode source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TableWriterNode
private final Optional<StatisticAggregations> statisticsAggregation;
private final List<VariableReferenceExpression> outputs;
private final Optional<Integer> taskCountIfScaledWriter;
private final Optional<Boolean> isTemporaryTableWriter;

@JsonCreator
public TableWriterNode(
Expand All @@ -73,9 +74,10 @@ public TableWriterNode(
@JsonProperty("partitioningScheme") Optional<PartitioningScheme> tablePartitioningScheme,
@JsonProperty("preferredShufflePartitioningScheme") Optional<PartitioningScheme> preferredShufflePartitioningScheme,
@JsonProperty("statisticsAggregation") Optional<StatisticAggregations> statisticsAggregation,
@JsonProperty("taskCountIfScaledWriter") Optional<Integer> taskCountIfScaledWriter)
@JsonProperty("taskCountIfScaledWriter") Optional<Integer> taskCountIfScaledWriter,
@JsonProperty("isTemporaryTableWriter") Optional<Boolean> isTemporaryTableWriter)
{
this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter);
this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter);
}

public TableWriterNode(
Expand All @@ -93,7 +95,8 @@ public TableWriterNode(
Optional<PartitioningScheme> tablePartitioningScheme,
Optional<PartitioningScheme> preferredShufflePartitioningScheme,
Optional<StatisticAggregations> statisticsAggregation,
Optional<Integer> taskCountIfScaledWriter)
Optional<Integer> taskCountIfScaledWriter,
Optional<Boolean> isTemporaryTableWriter)
{
super(sourceLocation, id, statsEquivalentPlanNode);

Expand Down Expand Up @@ -126,6 +129,7 @@ public TableWriterNode(
});
this.outputs = outputs.build();
this.taskCountIfScaledWriter = requireNonNull(taskCountIfScaledWriter, "taskCountIfScaledWriter is null");
this.isTemporaryTableWriter = requireNonNull(isTemporaryTableWriter, "isTemporaryTableWriter is null");
}

@JsonProperty
Expand Down Expand Up @@ -212,6 +216,12 @@ public Optional<Integer> getTaskCountIfScaledWriter()
return taskCountIfScaledWriter;
}

@JsonProperty
public Optional<Boolean> getIsTemporaryTableWriter()
{
return isTemporaryTableWriter;
}

@Override
public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
{
Expand All @@ -236,7 +246,7 @@ public PlanNode replaceChildren(List<PlanNode> newChildren)
tablePartitioningScheme,
preferredShufflePartitioningScheme,
statisticsAggregation,
taskCountIfScaledWriter);
taskCountIfScaledWriter, isTemporaryTableWriter);
}

@Override
Expand All @@ -257,7 +267,7 @@ public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalent
tablePartitioningScheme,
preferredShufflePartitioningScheme,
statisticsAggregation,
taskCountIfScaledWriter);
taskCountIfScaledWriter, isTemporaryTableWriter);
}

// only used during planning -- will not be serialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ public TableWriterNode tableWriter(List<VariableReferenceExpression> columns, Li
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED;
Expand Down Expand Up @@ -152,7 +152,7 @@ public PlanAndFragments createReadySubPlans(PlanNode plan)
sqlParser,
idAllocator,
variableAllocator,
getTableWriterNodeIds(plan));
getOutputTableWriterNodeIds(plan));
FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
plan.getOutputVariables()));
Expand Down

0 comments on commit 37ecfb2

Please sign in to comment.