From 59b440ef5b2ebfb1c4c545ac769c34671e4eccdf Mon Sep 17 00:00:00 2001 From: Pedro Eugenio Rocha Pedreira Date: Wed, 25 Sep 2024 17:44:14 -0700 Subject: [PATCH] Properly report lazy loaded inputBytes Summary: Whenever there is lazy loading, TableScan would come out with zero input and output bytes, and they would be attributed to the operator that in fact loaded the lazy vector. Using the existing mechanism to save it locally and periodically apply to the correct source operator. Differential Revision: D63414708 --- velox/exec/Driver.cpp | 22 +++++++++++++++++++-- velox/exec/Driver.h | 13 ++++++------ velox/exec/Operator.h | 1 + velox/exec/tests/PrintPlanWithStatsTest.cpp | 3 +++ velox/exec/tests/TableScanTest.cpp | 7 ++++++- velox/vector/LazyVector.cpp | 16 ++++++++++----- velox/vector/LazyVector.h | 2 ++ velox/vector/tests/LazyVectorTest.cpp | 6 ++++-- 8 files changed, 53 insertions(+), 17 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 307ba0d323708..c61139248c5f0 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -395,13 +395,15 @@ size_t OpCallStatusRaw::callDuration() const { : fmt::format("null::{}", operatorMethod); } -CpuWallTiming Driver::processLazyTiming( +CpuWallTiming Driver::processLazyIoStats( Operator& op, const CpuWallTiming& timing) { if (&op == operators_[0].get()) { return timing; } auto lockStats = op.stats().wlock(); + + // Checks and tries to update cpu time from lazy loads. auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos); if (it == lockStats->runtimeStats.end()) { // Return early if no lazy activity. Lazy CPU and wall times are recorded @@ -417,6 +419,8 @@ CpuWallTiming Driver::processLazyTiming( return timing; } lockStats->lastLazyCpuNanos = cpu; + + // Checks and tries to update wall time from lazy loads. int64_t wallDelta = 0; it = lockStats->runtimeStats.find(LazyVector::kWallNanos); if (it != lockStats->runtimeStats.end()) { @@ -426,6 +430,18 @@ CpuWallTiming Driver::processLazyTiming( lockStats->lastLazyWallNanos = wall; } } + + // Checks and tries to update input bytes from lazy loads. + int64_t inputBytesDelta = 0; + it = lockStats->runtimeStats.find(LazyVector::kInputBytes); + if (it != lockStats->runtimeStats.end()) { + int64_t inputBytes = it->second.sum; + inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes; + if (inputBytesDelta > 0) { + lockStats->lastLazyInputBytes = inputBytes; + } + } + lockStats.unlock(); cpuDelta = std::min(cpuDelta, timing.cpuNanos); wallDelta = std::min(wallDelta, timing.wallNanos); @@ -435,6 +451,8 @@ CpuWallTiming Driver::processLazyTiming( static_cast(wallDelta), static_cast(cpuDelta), }); + lockStats->inputBytes += inputBytesDelta; + lockStats->outputBytes += inputBytesDelta; return CpuWallTiming{ 1, timing.wallNanos - wallDelta, @@ -1012,7 +1030,7 @@ void Driver::withDeltaCpuWallTimer( // opTimingMember upon destruction of the timer when withDeltaCpuWallTimer // ends. The timer is created on the stack to avoid heap allocation auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) { - auto elapsedSelfTime = processLazyTiming(*op, elapsedTime); + auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime); op->stats().withWLock([&](auto& lockedStats) { (lockedStats.*opTimingMember).add(elapsedSelfTime); }); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7551b2b304d71..06bf91d0b80b0 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -528,13 +528,12 @@ class Driver : public std::enable_shared_from_this { TimingMemberPtr opTimingMember, Func&& opFunction); - // Adjusts 'timing' by removing the lazy load wall and CPU times - // accrued since last time timing information was recorded for - // 'op'. The accrued lazy load times are credited to the source - // operator of 'this'. The per-operator runtimeStats for lazy load - // are left in place to reflect which operator triggered the load - // but these do not bias the op's timing. - CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing); + // Adjusts 'timing' by removing the lazy load wall time, CPU time, and input + // bytes accrued since last time timing information was recorded for 'op'. The + // accrued lazy load times are credited to the source operator of 'this'. The + // per-operator runtimeStats for lazy load are left in place to reflect which + // operator triggered the load but these do not bias the op's timing. + CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing); inline void validateOperatorOutputResult( const RowVectorPtr& result, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index a369b4ffcf2fb..d8e4dd7cd81c0 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -174,6 +174,7 @@ struct OperatorStats { // Last recorded values for lazy loading times for loads triggered by 'this'. int64_t lastLazyCpuNanos{0}; int64_t lastLazyWallNanos{0}; + int64_t lastLazyInputBytes{0}; // Total null keys processed by the operator. // Currently populated only by HashJoin/HashBuild. diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 9b52725c3feea..3539c301341b2 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, {" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, {" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"}, {" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"}, @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"}, {" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index fd982240c2c64..a20c1c5605736 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3816,7 +3816,12 @@ TEST_F(TableScanTest, structLazy) { .project({"cardinality(c2.c0)"}) .planNode(); - assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + + // Ensure lazy stats are attributed to table scan. + auto stats = task->taskStats(); + ASSERT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0); + ASSERT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0); } TEST_F(TableScanTest, interleaveLazyEager) { diff --git a/velox/vector/LazyVector.cpp b/velox/vector/LazyVector.cpp index d0cc09c3d59e1..b6d0da986a2f1 100644 --- a/velox/vector/LazyVector.cpp +++ b/velox/vector/LazyVector.cpp @@ -23,16 +23,20 @@ #include "velox/vector/SelectivityVector.h" namespace facebook::velox { - namespace { -void writeIOTiming(const CpuWallTiming& delta) { + +void recordLazyIOStats(const CpuWallTiming& cpuDelta, uint64_t inputBytes) { addThreadLocalRuntimeStat( LazyVector::kWallNanos, - RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos)); + RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos)); addThreadLocalRuntimeStat( LazyVector::kCpuNanos, - RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos)); + RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos)); + addThreadLocalRuntimeStat( + LazyVector::kInputBytes, + RuntimeCounter(inputBytes, RuntimeCounter::Unit::kBytes)); } + } // namespace void VectorLoader::load( @@ -41,7 +45,9 @@ void VectorLoader::load( vector_size_t resultSize, VectorPtr* result) { { - DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); }); + DeltaCpuWallTimer lazyIoRecorder([&](auto& cpuDelta) { + recordLazyIOStats(cpuDelta, *result ? (*result)->estimateFlatSize() : 0); + }); loadInternal(rows, hook, resultSize, result); } if (hook) { diff --git a/velox/vector/LazyVector.h b/velox/vector/LazyVector.h index 30354af615abe..3957002a1d31b 100644 --- a/velox/vector/LazyVector.h +++ b/velox/vector/LazyVector.h @@ -224,6 +224,8 @@ class LazyVector : public BaseVector { public: static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos"; static constexpr const char* kWallNanos = "dataSourceLazyWallNanos"; + static constexpr const char* kInputBytes = "dataSourceLazyInputBytes"; + LazyVector( velox::memory::MemoryPool* pool, TypePtr type, diff --git a/velox/vector/tests/LazyVectorTest.cpp b/velox/vector/tests/LazyVectorTest.cpp index 2ad3cbc8e4464..aee10a50bc423 100644 --- a/velox/vector/tests/LazyVectorTest.cpp +++ b/velox/vector/tests/LazyVectorTest.cpp @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) { std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) { return x.first < y.first; }); - ASSERT_EQ(stats.size(), 2); + ASSERT_EQ(stats.size(), 3); ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos); ASSERT_GE(stats[0].second.value, 0); - ASSERT_EQ(stats[1].first, LazyVector::kWallNanos); + ASSERT_EQ(stats[1].first, LazyVector::kInputBytes); ASSERT_GE(stats[1].second.value, 0); + ASSERT_EQ(stats[2].first, LazyVector::kWallNanos); + ASSERT_GE(stats[2].second.value, 0); }