Skip to content

Commit

Permalink
[native] Reduce copy-paste in PrestoTask::updateInfoLocked
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Feb 15, 2024
1 parent 7b6b5fc commit bef3e19
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,6 @@ static protocol::RuntimeMetric createProtocolRuntimeMetric(
return protocol::RuntimeMetric{name, unit, value, 1, value, value};
}

// Creates a Velox runtime metric object from a raw value.
static RuntimeMetric createVeloxRuntimeMetric(
int64_t value,
RuntimeCounter::Unit unit) {
return RuntimeMetric{value, unit};
}

// Updates a Velox runtime metric in the unordered map.
static void addRuntimeMetric(
std::unordered_map<std::string, RuntimeMetric>& runtimeMetrics,
Expand All @@ -149,12 +142,19 @@ static void addRuntimeMetricIfNotZero(
const std::string& name,
uint64_t value) {
if (value > 0) {
auto veloxMetric =
createVeloxRuntimeMetric(value, RuntimeCounter::Unit::kNone);
auto veloxMetric = RuntimeMetric(value, RuntimeCounter::Unit::kNone);
addRuntimeMetric(runtimeMetrics, name, veloxMetric);
}
}

RuntimeMetric fromMillis(int64_t ms) {
return RuntimeMetric{ms * 1'000'000, velox::RuntimeCounter::Unit::kNanos};
}

RuntimeMetric fromNanos(int64_t nanos) {
return RuntimeMetric{nanos, velox::RuntimeCounter::Unit::kNanos};
}

// Utility to generate presto runtime stat name when translating velox runtime
// stats over to presto.
std::string generateRuntimeStatName(
Expand Down Expand Up @@ -453,36 +453,27 @@ protocol::TaskInfo PrestoTask::updateInfoLocked() {
if (taskStats.outputBufferStats.has_value()) {
const auto& outputBufferStats = taskStats.outputBufferStats.value();

const auto averageBufferTimeNanos =
outputBufferStats.averageBufferTimeMs * 1'000'000;
taskRuntimeStats.insert(
{"averageOutputBufferWallNanos",
RuntimeMetric(averageBufferTimeNanos, RuntimeCounter::Unit::kNanos)});
fromMillis(outputBufferStats.averageBufferTimeMs)});
}

if (taskStats.memoryReclaimCount > 0) {
taskRuntimeStats["memoryReclaimCount"].addValue(
taskStats.memoryReclaimCount);
taskRuntimeStats.insert(
{"memoryReclaimWallNanos",
RuntimeMetric(
taskStats.memoryReclaimMs * 1'000'000,
RuntimeCounter::Unit::kNanos)});
{"memoryReclaimWallNanos", fromMillis(taskStats.memoryReclaimMs)});
}

if (taskStats.endTimeMs >= taskStats.executionEndTimeMs) {
taskRuntimeStats.insert(
{"outputConsumedDelayInNanos",
RuntimeMetric(
(taskStats.endTimeMs - taskStats.executionEndTimeMs) * 1'000'000,
RuntimeCounter::Unit::kNanos)});
fromMillis(taskStats.endTimeMs - taskStats.executionEndTimeMs)});
taskRuntimeStats["createTime"].addValue(taskStats.executionStartTimeMs);
taskRuntimeStats["endTime"].addValue(taskStats.endTimeMs);
}

taskRuntimeStats.insert(
{"nativeProcessCpuTime",
RuntimeMetric(processCpuTime_, RuntimeCounter::Unit::kNanos)});
taskRuntimeStats.insert({"nativeProcessCpuTime", fromNanos(processCpuTime_)});

for (int i = 0; i < taskStats.pipelineStats.size(); ++i) {
auto& pipelineOut = info.stats.pipelines[i];
Expand Down

0 comments on commit bef3e19

Please sign in to comment.