From 35c51ba44fa5b66b5299fd7090cce8660267492b Mon Sep 17 00:00:00 2001 From: Shang Ma Date: Tue, 21 Jan 2025 16:03:21 -0800 Subject: [PATCH] Improve the merging of operator stats --- .../presto/execution/StageExecutionInfo.java | 392 +++++++++++------- .../presto/operator/OperatorStats.java | 116 +++--- .../presto/operator/PipelineContext.java | 48 +-- .../presto/operator/TestOperatorStats.java | 28 +- 4 files changed, 340 insertions(+), 244 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java index 5153a964db081..54e12b6f3f857 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -42,6 +43,7 @@ import static com.facebook.presto.common.RuntimeUnit.NANO; import static com.facebook.presto.common.RuntimeUnit.NONE; import static com.facebook.presto.execution.StageExecutionState.FINISHED; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.DataSize.succinctBytes; import static io.airlift.units.Duration.succinctDuration; import static java.lang.Math.max; @@ -71,186 +73,84 @@ public static StageExecutionInfo create( int finishedLifespans, int totalLifespans) { - int totalTasks = taskInfos.size(); - int runningTasks = 0; - int completedTasks = 0; - - int totalDrivers = 0; - int queuedDrivers = 0; - int runningDrivers = 0; - int blockedDrivers = 0; - int completedDrivers = 0; - - double cumulativeUserMemory = 0; - double cumulativeTotalMemory = 0; - long userMemoryReservation = 0; - long totalMemoryReservation = 0; - - long totalScheduledTime = 0; - long totalCpuTime = 0; - long retriedCpuTime = 0; - long totalBlockedTime = 0; - - long totalAllocation = 0; - - long rawInputDataSize = 0; - long rawInputPositions = 0; - - long processedInputDataSize = 0; - long processedInputPositions = 0; - - long bufferedDataSize = 0; - long outputDataSize = 0; - long outputPositions = 0; - - long physicalWrittenDataSize = 0; - - int fullGcCount = 0; - int fullGcTaskCount = 0; - int minFullGcSec = 0; - int maxFullGcSec = 0; - int totalFullGcSec = 0; - - boolean fullyBlocked = true; - Set blockedReasons = new HashSet<>(); - - Map operatorToStats = new HashMap<>(); - RuntimeStats mergedRuntimeStats = new RuntimeStats(); - mergedRuntimeStats.mergeWith(stageRuntimeStats); - - List allTaskStats = new ArrayList<>(); + TaskStatsAggregator taskStatsAggregator = new TaskStatsAggregator(taskInfos.size(), stageRuntimeStats); for (TaskInfo taskInfo : taskInfos) { TaskState taskState = taskInfo.getTaskStatus().getState(); if (taskState.isDone()) { - completedTasks++; + taskStatsAggregator.increaseCompleteTaskCount(1); } else { - runningTasks++; + taskStatsAggregator.increaseRunningTaskCount(1); } TaskStats taskStats = taskInfo.getStats(); - allTaskStats.add(taskStats); if (state == FINISHED && taskInfo.getTaskStatus().getState() == TaskState.FAILED) { - retriedCpuTime += taskStats.getTotalCpuTimeInNanos(); + taskStatsAggregator.increaseRetriedCpuTime(taskStats.getTotalCpuTimeInNanos()); } if (!taskState.isDone()) { - fullyBlocked &= taskStats.isFullyBlocked(); - blockedReasons.addAll(taskStats.getBlockedReasons()); + taskStatsAggregator.updateFullyBlocked(taskStats.isFullyBlocked()); + taskStatsAggregator.addNewBlockedReasons(taskStats.getBlockedReasons()); } - bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes(); - } - - for (TaskStats taskStats : allTaskStats) { - totalDrivers += taskStats.getTotalDrivers(); - queuedDrivers += taskStats.getQueuedDrivers(); - runningDrivers += taskStats.getRunningDrivers(); - blockedDrivers += taskStats.getBlockedDrivers(); - completedDrivers += taskStats.getCompletedDrivers(); - - cumulativeUserMemory += taskStats.getCumulativeUserMemory(); - cumulativeTotalMemory += taskStats.getCumulativeTotalMemory(); - - long taskUserMemory = taskStats.getUserMemoryReservationInBytes(); - long taskSystemMemory = taskStats.getSystemMemoryReservationInBytes(); - userMemoryReservation += taskUserMemory; - totalMemoryReservation += taskUserMemory + taskSystemMemory; - - totalScheduledTime += taskStats.getTotalScheduledTimeInNanos(); - totalCpuTime += taskStats.getTotalCpuTimeInNanos(); - totalBlockedTime += taskStats.getTotalBlockedTimeInNanos(); - - totalAllocation += taskStats.getTotalAllocationInBytes(); - - rawInputDataSize += taskStats.getRawInputDataSizeInBytes(); - rawInputPositions += taskStats.getRawInputPositions(); - - processedInputDataSize += taskStats.getProcessedInputDataSizeInBytes(); - processedInputPositions += taskStats.getProcessedInputPositions(); - - outputDataSize += taskStats.getOutputDataSizeInBytes(); - outputPositions += taskStats.getOutputPositions(); - - physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSizeInBytes(); - - fullGcCount += taskStats.getFullGcCount(); - fullGcTaskCount += taskStats.getFullGcCount() > 0 ? 1 : 0; - - int gcSec = toIntExact(MILLISECONDS.toSeconds(taskStats.getFullGcTimeInMillis())); - totalFullGcSec += gcSec; - minFullGcSec = min(minFullGcSec, gcSec); - maxFullGcSec = max(maxFullGcSec, gcSec); - - for (PipelineStats pipeline : taskStats.getPipelines()) { - for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { - String id = pipeline.getPipelineId() + "." + operatorStats.getOperatorId(); - operatorToStats.compute(id, (k, v) -> v == null ? operatorStats : v.add(operatorStats)); - } - } - mergedRuntimeStats.mergeWith(taskStats.getRuntimeStats()); - mergedRuntimeStats.addMetricValue(DRIVER_COUNT_PER_TASK, NONE, taskStats.getTotalDrivers()); - mergedRuntimeStats.addMetricValue(TASK_ELAPSED_TIME_NANOS, NANO, taskStats.getElapsedTimeInNanos()); - mergedRuntimeStats.addMetricValueIgnoreZero(TASK_QUEUED_TIME_NANOS, NANO, taskStats.getQueuedTimeInNanos()); - mergedRuntimeStats.addMetricValue(TASK_SCHEDULED_TIME_NANOS, NANO, taskStats.getTotalScheduledTimeInNanos()); - mergedRuntimeStats.addMetricValueIgnoreZero(TASK_BLOCKED_TIME_NANOS, NANO, taskStats.getTotalBlockedTimeInNanos()); + taskStatsAggregator.increaseBufferedDataSize(taskInfo.getOutputBuffers().getTotalBufferedBytes()); + taskStatsAggregator.processTaskStats(taskStats); } StageExecutionStats stageExecutionStats = new StageExecutionStats( schedulingComplete, getSplitDistribution, - totalTasks, - runningTasks, - completedTasks, + taskStatsAggregator.totalTaskCount, + taskStatsAggregator.runningTaskCount, + taskStatsAggregator.completedTaskCount, totalLifespans, finishedLifespans, - totalDrivers, - queuedDrivers, - runningDrivers, - blockedDrivers, - completedDrivers, + taskStatsAggregator.totalDrivers, + taskStatsAggregator.queuedDrivers, + taskStatsAggregator.runningDrivers, + taskStatsAggregator.blockedDrivers, + taskStatsAggregator.completedDrivers, - cumulativeUserMemory, - cumulativeTotalMemory, - succinctBytes(userMemoryReservation), - succinctBytes(totalMemoryReservation), + taskStatsAggregator.cumulativeUserMemory, + taskStatsAggregator.cumulativeTotalMemory, + succinctBytes(taskStatsAggregator.userMemoryReservation), + succinctBytes(taskStatsAggregator.totalMemoryReservation), peakUserMemoryReservation, peakNodeTotalMemoryReservation, - succinctDuration(totalScheduledTime, NANOSECONDS), - succinctDuration(totalCpuTime, NANOSECONDS), - succinctDuration(retriedCpuTime, NANOSECONDS), - succinctDuration(totalBlockedTime, NANOSECONDS), - fullyBlocked && runningTasks > 0, - blockedReasons, - - succinctBytes(totalAllocation), - - succinctBytes(rawInputDataSize), - rawInputPositions, - succinctBytes(processedInputDataSize), - processedInputPositions, - succinctBytes(bufferedDataSize), - succinctBytes(outputDataSize), - outputPositions, - succinctBytes(physicalWrittenDataSize), + succinctDuration(taskStatsAggregator.totalScheduledTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.totalCpuTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.retriedCpuTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.totalBlockedTime, NANOSECONDS), + taskStatsAggregator.fullyBlocked && taskStatsAggregator.runningTaskCount > 0, + taskStatsAggregator.blockedReasons, + + succinctBytes(taskStatsAggregator.totalAllocation), + + succinctBytes(taskStatsAggregator.rawInputDataSize), + taskStatsAggregator.rawInputPositions, + succinctBytes(taskStatsAggregator.processedInputDataSize), + taskStatsAggregator.processedInputPositions, + succinctBytes(taskStatsAggregator.bufferedDataSize), + succinctBytes(taskStatsAggregator.outputDataSize), + taskStatsAggregator.outputPositions, + succinctBytes(taskStatsAggregator.physicalWrittenDataSize), new StageGcStatistics( stageExecutionId.getStageId().getId(), stageExecutionId.getId(), - totalTasks, - fullGcTaskCount, - minFullGcSec, - maxFullGcSec, - totalFullGcSec, - (int) (1.0 * totalFullGcSec / fullGcCount)), - - ImmutableList.copyOf(operatorToStats.values()), - mergedRuntimeStats); + taskStatsAggregator.totalTaskCount, + taskStatsAggregator.fullGcTaskCount, + taskStatsAggregator.minFullGcSec, + taskStatsAggregator.maxFullGcSec, + taskStatsAggregator.totalFullGcSec, + (int) (1.0 * taskStatsAggregator.totalFullGcSec / taskStatsAggregator.fullGcCount)), + taskStatsAggregator.getOperatorSummaries(), + taskStatsAggregator.getMergedRuntimeStats()); return new StageExecutionInfo( state, @@ -301,12 +201,198 @@ public boolean isFinal() return state.isDone() && tasks.stream().allMatch(taskInfo -> taskInfo.getTaskStatus().getState().isDone()); } - public static StageExecutionInfo unscheduledExecutionInfo(int stageId, boolean isQueryDone) + private static class OperatorKey { - return new StageExecutionInfo( - isQueryDone ? StageExecutionState.ABORTED : StageExecutionState.PLANNED, - StageExecutionStats.zero(stageId), - ImmutableList.of(), - Optional.empty()); + private final int pipelineId; + private final int operatorId; + + public OperatorKey(int pipelineId, int operatorId) + { + this.pipelineId = pipelineId; + this.operatorId = operatorId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OperatorKey that = (OperatorKey) o; + return pipelineId == that.pipelineId && operatorId == that.operatorId; + } + + @Override + public int hashCode() + { + return Objects.hash(pipelineId, operatorId); + } + } + + private static class TaskStatsAggregator + { + private final int totalTaskCount; + private int runningTaskCount; + private int completedTaskCount; + private long retriedCpuTime; + private long bufferedDataSize; + + private boolean fullyBlocked = true; + private final Set blockedReasons = new HashSet<>(); + + private int totalDrivers; + private int queuedDrivers; + private int runningDrivers; + private int blockedDrivers; + private int completedDrivers; + + private double cumulativeUserMemory; + private double cumulativeTotalMemory; + private long userMemoryReservation; + private long totalMemoryReservation; + + private long totalScheduledTime; + private long totalCpuTime; + private long totalBlockedTime; + + private long totalAllocation; + + private long rawInputDataSize; + private long rawInputPositions; + + private long processedInputDataSize; + private long processedInputPositions; + + private long outputDataSize; + private long outputPositions; + + private long physicalWrittenDataSize; + + private int fullGcCount; + private int fullGcTaskCount; + private int minFullGcSec; + private int maxFullGcSec; + private int totalFullGcSec; + + private final RuntimeStats mergedRuntimeStats = new RuntimeStats(); + private final Map> operatorStatsByKey = new HashMap<>(); + + public TaskStatsAggregator(int totalTaskCount, RuntimeStats stageRuntimeStats) + { + this.totalTaskCount = totalTaskCount; + this.mergedRuntimeStats.mergeWith(stageRuntimeStats); + } + + public void processTaskStats(TaskStats taskStats) + { + totalDrivers += taskStats.getTotalDrivers(); + queuedDrivers += taskStats.getQueuedDrivers(); + runningDrivers += taskStats.getRunningDrivers(); + blockedDrivers += taskStats.getBlockedDrivers(); + completedDrivers += taskStats.getCompletedDrivers(); + + cumulativeUserMemory += taskStats.getCumulativeUserMemory(); + cumulativeTotalMemory += taskStats.getCumulativeTotalMemory(); + + long taskUserMemory = taskStats.getUserMemoryReservationInBytes(); + long taskSystemMemory = taskStats.getSystemMemoryReservationInBytes(); + userMemoryReservation += taskUserMemory; + totalMemoryReservation += taskUserMemory + taskSystemMemory; + + totalScheduledTime += taskStats.getTotalScheduledTimeInNanos(); + totalCpuTime += taskStats.getTotalCpuTimeInNanos(); + totalBlockedTime += taskStats.getTotalBlockedTimeInNanos(); + + totalAllocation += taskStats.getTotalAllocationInBytes(); + + rawInputDataSize += taskStats.getRawInputDataSizeInBytes(); + rawInputPositions += taskStats.getRawInputPositions(); + + processedInputDataSize += taskStats.getProcessedInputDataSizeInBytes(); + processedInputPositions += taskStats.getProcessedInputPositions(); + + outputDataSize += taskStats.getOutputDataSizeInBytes(); + outputPositions += taskStats.getOutputPositions(); + + physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSizeInBytes(); + + fullGcCount += taskStats.getFullGcCount(); + fullGcTaskCount += taskStats.getFullGcCount() > 0 ? 1 : 0; + + int gcSec = toIntExact(MILLISECONDS.toSeconds(taskStats.getFullGcTimeInMillis())); + totalFullGcSec += gcSec; + minFullGcSec = min(minFullGcSec, gcSec); + maxFullGcSec = max(maxFullGcSec, gcSec); + + updateOperatorStats(taskStats); + updateRuntimeStats(taskStats); + } + + private void updateOperatorStats(TaskStats taskStats) + { + // Collect all operator stats by their key + for (PipelineStats pipeline : taskStats.getPipelines()) { + for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { + operatorStatsByKey.computeIfAbsent(new OperatorKey(pipeline.getPipelineId(), operatorStats.getOperatorId()), k -> new ArrayList<>()).add(operatorStats); + } + } + } + + private void updateRuntimeStats(TaskStats taskStats) + { + mergedRuntimeStats.mergeWith(taskStats.getRuntimeStats()); + mergedRuntimeStats.addMetricValue(DRIVER_COUNT_PER_TASK, NONE, taskStats.getTotalDrivers()); + mergedRuntimeStats.addMetricValue(TASK_ELAPSED_TIME_NANOS, NANO, taskStats.getElapsedTimeInNanos()); + mergedRuntimeStats.addMetricValueIgnoreZero(TASK_QUEUED_TIME_NANOS, NANO, taskStats.getQueuedTimeInNanos()); + mergedRuntimeStats.addMetricValue(TASK_SCHEDULED_TIME_NANOS, NANO, taskStats.getTotalScheduledTimeInNanos()); + mergedRuntimeStats.addMetricValueIgnoreZero(TASK_BLOCKED_TIME_NANOS, NANO, taskStats.getTotalBlockedTimeInNanos()); + } + + public RuntimeStats getMergedRuntimeStats() + { + return mergedRuntimeStats; + } + + public List getOperatorSummaries() + { + return operatorStatsByKey.values().stream() + .map(OperatorStats::merge) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()); + } + + public void increaseRunningTaskCount(int count) + { + runningTaskCount += count; + } + + public void increaseCompleteTaskCount(int count) + { + completedTaskCount += count; + } + + public void increaseRetriedCpuTime(long time) + { + retriedCpuTime += time; + } + + public void updateFullyBlocked(boolean blocked) + { + fullyBlocked &= blocked; + } + + public void addNewBlockedReasons(Set reasons) + { + blockedReasons.addAll(reasons); + } + + public void increaseBufferedDataSize(long bytes) + { + bufferedDataSize += bytes; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java index e32d83df9d8a0..5da664daa1d4a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java @@ -21,13 +21,14 @@ import com.facebook.presto.util.Mergeable; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import io.airlift.units.Duration; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import java.util.HashSet; +import java.util.List; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; @@ -716,67 +717,77 @@ public DataSize getIsBlockedAllocation() return isBlockedAllocation; } - public OperatorStats add(OperatorStats operatorStats) + public static Optional merge(List operators) { - return add(ImmutableList.of(operatorStats)); - } + if (operators.isEmpty()) { + return Optional.empty(); + } - public OperatorStats add(Iterable operators) - { - long totalDrivers = this.totalDrivers; + OperatorStats first = operators.stream().findFirst().get(); + int stageId = first.getStageId(); + int operatorId = first.getOperatorId(); + int stageExecutionId = first.getStageExecutionId(); + int pipelineId = first.getPipelineId(); + PlanNodeId planNodeId = first.getPlanNodeId(); + String operatorType = first.getOperatorType(); + + long totalDrivers = 0; + + long isBlockedCalls = 0; + long isBlockedWall = 0; + long isBlockedCpu = 0; + long isBlockedAllocation = 0; + + long addInputCalls = 0; + long addInputWall = 0; + long addInputCpu = 0; + long addInputAllocation = 0; + long rawInputDataSize = 0; + long rawInputPositions = 0; + long inputDataSize = 0; + long inputPositions = 0; - long isBlockedCalls = this.isBlockedCalls; - long isBlockedWall = this.isBlockedWall.roundTo(NANOSECONDS); - long isBlockedCpu = this.isBlockedCpu.roundTo(NANOSECONDS); - long isBlockedAllocation = this.isBlockedAllocation.toBytes(); + double sumSquaredInputPositions = 0.0; - long addInputCalls = this.addInputCalls; - long addInputWall = this.addInputWall.roundTo(NANOSECONDS); - long addInputCpu = this.addInputCpu.roundTo(NANOSECONDS); - double addInputAllocation = this.addInputAllocation.toBytes(); - double rawInputDataSize = this.rawInputDataSize.toBytes(); - long rawInputPositions = this.rawInputPositions; - double inputDataSize = this.inputDataSize.toBytes(); - long inputPositions = this.inputPositions; - double sumSquaredInputPositions = this.sumSquaredInputPositions; + long getOutputCalls = 0; + long getOutputWall = 0; + long getOutputCpu = 0; + long getOutputAllocation = 0; + long outputDataSize = 0; + long outputPositions = 0; - long getOutputCalls = this.getOutputCalls; - long getOutputWall = this.getOutputWall.roundTo(NANOSECONDS); - long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS); - double getOutputAllocation = this.getOutputAllocation.toBytes(); - double outputDataSize = this.outputDataSize.toBytes(); - long outputPositions = this.outputPositions; + long physicalWrittenDataSize = 0; - double physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes(); + long finishCalls = 0; + long finishWall = 0; + long finishCpu = 0; + long finishAllocation = 0; - long additionalCpu = this.additionalCpu.roundTo(NANOSECONDS); - long blockedWall = this.blockedWall.roundTo(NANOSECONDS); + long additionalCpu = 0; + long blockedWall = 0; - long finishCalls = this.finishCalls; - long finishWall = this.finishWall.roundTo(NANOSECONDS); - long finishCpu = this.finishCpu.roundTo(NANOSECONDS); - long finishAllocation = this.finishAllocation.toBytes(); + long memoryReservation = 0; + long revocableMemoryReservation = 0; + long systemMemoryReservation = 0; - double memoryReservation = this.userMemoryReservation.toBytes(); - double revocableMemoryReservation = this.revocableMemoryReservation.toBytes(); - double systemMemoryReservation = this.systemMemoryReservation.toBytes(); - double peakUserMemory = this.peakUserMemoryReservation.toBytes(); - double peakSystemMemory = this.peakSystemMemoryReservation.toBytes(); - double peakTotalMemory = this.peakTotalMemoryReservation.toBytes(); + long peakUserMemory = 0; + long peakSystemMemory = 0; + long peakTotalMemory = 0; - double spilledDataSize = this.spilledDataSize.toBytes(); + long spilledDataSize = 0; - Optional blockedReason = this.blockedReason; + long nullJoinBuildKeyCount = 0; + long joinBuildKeyCount = 0; + long nullJoinProbeKeyCount = 0; + long joinProbeKeyCount = 0; - RuntimeStats runtimeStats = RuntimeStats.copyOf(this.runtimeStats); - DynamicFilterStats dynamicFilterStats = DynamicFilterStats.copyOf(this.dynamicFilterStats); + RuntimeStats runtimeStats = new RuntimeStats(); + DynamicFilterStats dynamicFilterStats = new DynamicFilterStats(new HashSet<>()); - long nullJoinBuildKeyCount = this.nullJoinBuildKeyCount; - long joinBuildKeyCount = this.joinBuildKeyCount; - long nullJoinProbeKeyCount = this.nullJoinProbeKeyCount; - long joinProbeKeyCount = this.joinProbeKeyCount; + Optional blockedReason = Optional.empty(); + + Mergeable base = null; - Mergeable base = getMergeableInfoOrNull(info); for (OperatorStats operator : operators) { checkArgument(operator.getOperatorId() == operatorId, "Expected operatorId to be %s but was %s", operatorId, operator.getOperatorId()); @@ -829,7 +840,10 @@ public OperatorStats add(Iterable operators) } OperatorInfo info = operator.getInfo(); - if (base != null && info != null && base.getClass() == info.getClass()) { + if (base == null) { + base = getMergeableInfoOrNull(info); + } + else if (info != null && base.getClass() == info.getClass()) { base = mergeInfo(base, info); } @@ -842,7 +856,7 @@ public OperatorStats add(Iterable operators) joinProbeKeyCount += operator.getJoinProbeKeyCount(); } - return new OperatorStats( + return Optional.of(new OperatorStats( stageId, stageExecutionId, pipelineId, @@ -901,7 +915,7 @@ public OperatorStats add(Iterable operators) nullJoinBuildKeyCount, joinBuildKeyCount, nullJoinProbeKeyCount, - joinProbeKeyCount); + joinProbeKeyCount)); } @SuppressWarnings("unchecked") diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java index 2bb2402e7a262..607107a643569 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java @@ -23,19 +23,19 @@ import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.memory.context.MemoryTrackingContext; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.ListenableFuture; import org.joda.time.DateTime; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -46,9 +46,11 @@ import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; @ThreadSafe public class PipelineContext @@ -93,7 +95,7 @@ public class PipelineContext private final AtomicLong physicalWrittenDataSize = new AtomicLong(); - private final ConcurrentMap operatorSummaries = new ConcurrentHashMap<>(); + private final ConcurrentMap operatorStatsById = new ConcurrentHashMap<>(); private final MemoryTrackingContext pipelineMemoryContext; @@ -199,10 +201,10 @@ public void driverFinished(DriverContext driverContext) totalAllocation.getAndAdd(driverStats.getTotalAllocation().toBytes()); - // merge the operator stats into the operator summary List operators = driverStats.getOperatorStats(); for (OperatorStats operator : operators) { - operatorSummaries.compute(operator.getOperatorId(), (operatorId, summaryStats) -> summaryStats == null ? operator : summaryStats.add(operator)); + operatorStatsById.compute(operator.getOperatorId(), + (operatorId, summaryStats) -> summaryStats == null ? operator : OperatorStats.merge(ImmutableList.of(operator, summaryStats)).orElse(null)); } rawInputDataSize.update(driverStats.getRawInputDataSize().toBytes()); @@ -378,9 +380,11 @@ public PipelineStats getPipelineStats() boolean hasUnfinishedDrivers = false; boolean unfinishedDriversFullyBlocked = true; - TreeMap operatorSummaries = new TreeMap<>(this.operatorSummaries); - ListMultimap runningOperators = ArrayListMultimap.create(); ImmutableList.Builder drivers = ImmutableList.builderWithExpectedSize(driverContexts.size()); + // Make deep copy of each list + Map> operatorStatsById = this.operatorStatsById.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> new ArrayList<>(Arrays.asList(e.getValue())))); + for (DriverContext driverContext : driverContexts) { DriverStats driverStats = driverContext.getDriverStats(); drivers.add(driverStats); @@ -402,7 +406,7 @@ public PipelineStats getPipelineStats() totalAllocation += driverStats.getTotalAllocation().toBytes(); for (OperatorStats operatorStats : driverStats.getOperatorStats()) { - runningOperators.put(operatorStats.getOperatorId(), operatorStats); + operatorStatsById.computeIfAbsent(operatorStats.getOperatorId(), k -> new ArrayList<>()).add(operatorStats); } rawInputDataSize += driverStats.getRawInputDataSize().toBytes(); @@ -417,26 +421,6 @@ public PipelineStats getPipelineStats() physicalWrittenDataSize += driverStats.getPhysicalWrittenDataSize().toBytes(); } - // merge the running operator stats into the operator summary - for (Integer operatorId : runningOperators.keySet()) { - List runningStats = runningOperators.get(operatorId); - if (runningStats.isEmpty()) { - continue; - } - OperatorStats current = operatorSummaries.get(operatorId); - OperatorStats combined; - if (current != null) { - combined = current.add(runningStats); - } - else { - combined = runningStats.get(0); - if (runningStats.size() > 1) { - combined = combined.add(runningStats.subList(1, runningStats.size())); - } - } - operatorSummaries.put(operatorId, combined); - } - PipelineStatus pipelineStatus = pipelineStatusBuilder.build(); boolean fullyBlocked = hasUnfinishedDrivers && unfinishedDriversFullyBlocked; @@ -486,7 +470,11 @@ public PipelineStats getPipelineStats() physicalWrittenDataSize, - ImmutableList.copyOf(operatorSummaries.values()), + operatorStatsById.values().stream() + .map(OperatorStats::merge) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()), drivers.build()); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java index 6291afd0b6dcb..4202dcd3b38e1 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java @@ -32,6 +32,7 @@ import static io.airlift.units.DataSize.Unit.BYTE; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; public class TestOperatorStats @@ -41,10 +42,10 @@ public class TestOperatorStats private static final String TEST_METRIC_NAME = "test_metric"; private static final RuntimeMetric TEST_RUNTIME_METRIC_1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 10, 2, 9, 1); private static final RuntimeMetric TEST_RUNTIME_METRIC_2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 3, 2); - private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_1 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId[] {new PlanNodeId("1"), - new PlanNodeId("2")}))); - private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_2 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId[] {new PlanNodeId("2"), - new PlanNodeId("3")}))); + private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_1 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId("1"), + new PlanNodeId("2")))); + private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_2 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId("2"), + new PlanNodeId("3")))); public static final OperatorStats EXPECTED = new OperatorStats( 0, @@ -229,16 +230,16 @@ public static void assertExpectedOperatorStats(OperatorStats actual) } @Test - public void testAdd() + public void testMergeWithNonMergeableInfo() { - OperatorStats actual = EXPECTED.add(ImmutableList.of(EXPECTED, EXPECTED)); + OperatorStats actual = OperatorStats.merge(ImmutableList.of(EXPECTED, EXPECTED, EXPECTED)).get(); assertEquals(actual.getStageId(), 0); assertEquals(actual.getStageExecutionId(), 10); assertEquals(actual.getOperatorId(), 41); assertEquals(actual.getOperatorType(), "test"); - assertEquals(actual.getTotalDrivers(), 3 * 1); + assertEquals(actual.getTotalDrivers(), 3); assertEquals(actual.getAddInputCalls(), 3 * 2); assertEquals(actual.getAddInputWall(), new Duration(3 * 3, NANOSECONDS)); assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); @@ -279,16 +280,16 @@ public void testAdd() } @Test - public void testAddMergeable() + public void testMergeWithMergeableInfo() { - OperatorStats actual = MERGEABLE.add(ImmutableList.of(MERGEABLE, MERGEABLE)); + OperatorStats actual = OperatorStats.merge(ImmutableList.of(MERGEABLE, MERGEABLE, MERGEABLE)).get(); assertEquals(actual.getStageId(), 0); assertEquals(actual.getStageExecutionId(), 10); assertEquals(actual.getOperatorId(), 41); assertEquals(actual.getOperatorType(), "test"); - assertEquals(actual.getTotalDrivers(), 3 * 1); + assertEquals(actual.getTotalDrivers(), 3); assertEquals(actual.getAddInputCalls(), 3 * 2); assertEquals(actual.getAddInputWall(), new Duration(3 * 3, NANOSECONDS)); assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); @@ -329,4 +330,11 @@ public void testAddMergeable() assertRuntimeMetricEquals(actual.getRuntimeStats().getMetric(TEST_METRIC_NAME), expectedMetric); assertEquals(actual.getDynamicFilterStats().getProducerNodeIds(), TEST_DYNAMIC_FILTER_STATS_2.getProducerNodeIds()); } + + @Test + public void testMergeEmptyCollection() + { + Optional merged = OperatorStats.merge(ImmutableList.of()); + assertFalse(merged.isPresent()); + } }