Skip to content

Commit

Permalink
[native] Fix crash in PrestoTask::updateExecutionInfoLocked
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Jan 16, 2025
1 parent fa090fc commit 65dad0a
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,43 +795,46 @@ void PrestoTask::updateExecutionInfoLocked(
}
for (int i = 0; i < veloxTaskStats.pipelineStats.size(); ++i) {
auto& veloxPipeline = veloxTaskStats.pipelineStats[i];
if (veloxPipeline.inputPipeline) {
const auto& firstVeloxOpStats = veloxPipeline.operatorStats[0];
prestoTaskStats.rawInputPositions += firstVeloxOpStats.rawInputPositions;
prestoTaskStats.rawInputDataSizeInBytes +=
firstVeloxOpStats.rawInputBytes;
prestoTaskStats.processedInputPositions +=
firstVeloxOpStats.inputPositions;
prestoTaskStats.processedInputDataSizeInBytes +=
firstVeloxOpStats.inputBytes;
}
if (veloxPipeline.outputPipeline) {
const auto& lastVeloxOpStats = veloxPipeline.operatorStats.back();
prestoTaskStats.outputPositions += lastVeloxOpStats.outputPositions;
prestoTaskStats.outputDataSizeInBytes += lastVeloxOpStats.outputBytes;
}

for (auto j = 0; j < veloxPipeline.operatorStats.size(); ++j) {
auto& veloxOp = veloxPipeline.operatorStats[j];
auto wallNanos = veloxOp.isBlockedTiming.wallNanos +
veloxOp.addInputTiming.wallNanos + veloxOp.getOutputTiming.wallNanos +
veloxOp.finishTiming.wallNanos;
auto cpuNanos = veloxOp.isBlockedTiming.cpuNanos +
veloxOp.addInputTiming.cpuNanos + veloxOp.getOutputTiming.cpuNanos +
veloxOp.finishTiming.cpuNanos;

prestoTaskStats.totalScheduledTimeInNanos += wallNanos;
prestoTaskStats.totalCpuTimeInNanos += cpuNanos;
prestoTaskStats.totalBlockedTimeInNanos += veloxOp.blockedWallNanos;
}
// tasks may fail before any operators are created;
// collect stats only when we have operators
if (!veloxPipeline.operatorStats.empty()) {
if (veloxPipeline.inputPipeline) {
const auto& firstVeloxOpStats = veloxPipeline.operatorStats[0];
prestoTaskStats.rawInputPositions +=
firstVeloxOpStats.rawInputPositions;
prestoTaskStats.rawInputDataSizeInBytes +=
firstVeloxOpStats.rawInputBytes;
prestoTaskStats.processedInputPositions +=
firstVeloxOpStats.inputPositions;
prestoTaskStats.processedInputDataSizeInBytes +=
firstVeloxOpStats.inputBytes;
}
if (veloxPipeline.outputPipeline) {
const auto& lastVeloxOpStats = veloxPipeline.operatorStats.back();
prestoTaskStats.outputPositions += lastVeloxOpStats.outputPositions;
prestoTaskStats.outputDataSizeInBytes += lastVeloxOpStats.outputBytes;
}

for (auto j = 0; j < veloxPipeline.operatorStats.size(); ++j) {
auto& veloxOp = veloxPipeline.operatorStats[j];
for (const auto& stat : veloxOp.runtimeStats) {
auto statName = generateRuntimeStatName(veloxOp, stat.first);
addRuntimeMetric(taskRuntimeStats, statName, stat.second);
for (auto j = 0; j < veloxPipeline.operatorStats.size(); ++j) {
auto& veloxOp = veloxPipeline.operatorStats[j];
auto wallNanos = veloxOp.isBlockedTiming.wallNanos +
veloxOp.addInputTiming.wallNanos +
veloxOp.getOutputTiming.wallNanos + veloxOp.finishTiming.wallNanos;
auto cpuNanos = veloxOp.isBlockedTiming.cpuNanos +
veloxOp.addInputTiming.cpuNanos + veloxOp.getOutputTiming.cpuNanos +
veloxOp.finishTiming.cpuNanos;

prestoTaskStats.totalScheduledTimeInNanos += wallNanos;
prestoTaskStats.totalCpuTimeInNanos += cpuNanos;
prestoTaskStats.totalBlockedTimeInNanos += veloxOp.blockedWallNanos;

for (const auto& stat : veloxOp.runtimeStats) {
auto statName = generateRuntimeStatName(veloxOp, stat.first);
addRuntimeMetric(taskRuntimeStats, statName, stat.second);
}
updateOperatorRuntimeStats(veloxOp, prestoTaskStats.runtimeStats);
}
updateOperatorRuntimeStats(veloxOp, prestoTaskStats.runtimeStats);
}

for (const auto& driverStat : veloxPipeline.driverStats) {
Expand Down

0 comments on commit 65dad0a

Please sign in to comment.