From bef3e1917b01a6e2ef1bd4d1037f3829607b6da2 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 15 Feb 2024 07:43:34 -0500 Subject: [PATCH] [native] Reduce copy-paste in PrestoTask::updateInfoLocked --- .../presto_cpp/main/PrestoTask.cpp | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index b8e2d6971c144..18dab3d8b6547 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -123,13 +123,6 @@ static protocol::RuntimeMetric createProtocolRuntimeMetric( return protocol::RuntimeMetric{name, unit, value, 1, value, value}; } -// Creates a Velox runtime metric object from a raw value. -static RuntimeMetric createVeloxRuntimeMetric( - int64_t value, - RuntimeCounter::Unit unit) { - return RuntimeMetric{value, unit}; -} - // Updates a Velox runtime metric in the unordered map. static void addRuntimeMetric( std::unordered_map& runtimeMetrics, @@ -149,12 +142,19 @@ static void addRuntimeMetricIfNotZero( const std::string& name, uint64_t value) { if (value > 0) { - auto veloxMetric = - createVeloxRuntimeMetric(value, RuntimeCounter::Unit::kNone); + auto veloxMetric = RuntimeMetric(value, RuntimeCounter::Unit::kNone); addRuntimeMetric(runtimeMetrics, name, veloxMetric); } } +RuntimeMetric fromMillis(int64_t ms) { + return RuntimeMetric{ms * 1'000'000, velox::RuntimeCounter::Unit::kNanos}; +} + +RuntimeMetric fromNanos(int64_t nanos) { + return RuntimeMetric{nanos, velox::RuntimeCounter::Unit::kNanos}; +} + // Utility to generate presto runtime stat name when translating velox runtime // stats over to presto. std::string generateRuntimeStatName( @@ -453,36 +453,27 @@ protocol::TaskInfo PrestoTask::updateInfoLocked() { if (taskStats.outputBufferStats.has_value()) { const auto& outputBufferStats = taskStats.outputBufferStats.value(); - const auto averageBufferTimeNanos = - outputBufferStats.averageBufferTimeMs * 1'000'000; taskRuntimeStats.insert( {"averageOutputBufferWallNanos", - RuntimeMetric(averageBufferTimeNanos, RuntimeCounter::Unit::kNanos)}); + fromMillis(outputBufferStats.averageBufferTimeMs)}); } if (taskStats.memoryReclaimCount > 0) { taskRuntimeStats["memoryReclaimCount"].addValue( taskStats.memoryReclaimCount); taskRuntimeStats.insert( - {"memoryReclaimWallNanos", - RuntimeMetric( - taskStats.memoryReclaimMs * 1'000'000, - RuntimeCounter::Unit::kNanos)}); + {"memoryReclaimWallNanos", fromMillis(taskStats.memoryReclaimMs)}); } if (taskStats.endTimeMs >= taskStats.executionEndTimeMs) { taskRuntimeStats.insert( {"outputConsumedDelayInNanos", - RuntimeMetric( - (taskStats.endTimeMs - taskStats.executionEndTimeMs) * 1'000'000, - RuntimeCounter::Unit::kNanos)}); + fromMillis(taskStats.endTimeMs - taskStats.executionEndTimeMs)}); taskRuntimeStats["createTime"].addValue(taskStats.executionStartTimeMs); taskRuntimeStats["endTime"].addValue(taskStats.endTimeMs); } - taskRuntimeStats.insert( - {"nativeProcessCpuTime", - RuntimeMetric(processCpuTime_, RuntimeCounter::Unit::kNanos)}); + taskRuntimeStats.insert({"nativeProcessCpuTime", fromNanos(processCpuTime_)}); for (int i = 0; i < taskStats.pipelineStats.size(); ++i) { auto& pipelineOut = info.stats.pipelines[i];