Skip to content

Commit

Permalink
Properly report lazy loaded inputBytes
Browse files Browse the repository at this point in the history
Summary:
Whenever there is lazy loading, TableScan would come out with zero
input and output bytes, and they would be attributed to the operator that in
fact loaded the lazy vector. Using the existing mechanism to save it locally
and periodically apply to the correct source operator.

Differential Revision: D63414708
  • Loading branch information
pedroerp authored and facebook-github-bot committed Sep 26, 2024
1 parent 50784a5 commit 59b440e
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 17 deletions.
22 changes: 20 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,15 @@ size_t OpCallStatusRaw::callDuration() const {
: fmt::format("null::{}", operatorMethod);
}

CpuWallTiming Driver::processLazyTiming(
CpuWallTiming Driver::processLazyIoStats(
Operator& op,
const CpuWallTiming& timing) {
if (&op == operators_[0].get()) {
return timing;
}
auto lockStats = op.stats().wlock();

// Checks and tries to update cpu time from lazy loads.
auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos);
if (it == lockStats->runtimeStats.end()) {
// Return early if no lazy activity. Lazy CPU and wall times are recorded
Expand All @@ -417,6 +419,8 @@ CpuWallTiming Driver::processLazyTiming(
return timing;
}
lockStats->lastLazyCpuNanos = cpu;

// Checks and tries to update wall time from lazy loads.
int64_t wallDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kWallNanos);
if (it != lockStats->runtimeStats.end()) {
Expand All @@ -426,6 +430,18 @@ CpuWallTiming Driver::processLazyTiming(
lockStats->lastLazyWallNanos = wall;
}
}

// Checks and tries to update input bytes from lazy loads.
int64_t inputBytesDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kInputBytes);
if (it != lockStats->runtimeStats.end()) {
int64_t inputBytes = it->second.sum;
inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes;
if (inputBytesDelta > 0) {
lockStats->lastLazyInputBytes = inputBytes;
}
}

lockStats.unlock();
cpuDelta = std::min<int64_t>(cpuDelta, timing.cpuNanos);
wallDelta = std::min<int64_t>(wallDelta, timing.wallNanos);
Expand All @@ -435,6 +451,8 @@ CpuWallTiming Driver::processLazyTiming(
static_cast<uint64_t>(wallDelta),
static_cast<uint64_t>(cpuDelta),
});
lockStats->inputBytes += inputBytesDelta;
lockStats->outputBytes += inputBytesDelta;
return CpuWallTiming{
1,
timing.wallNanos - wallDelta,
Expand Down Expand Up @@ -1012,7 +1030,7 @@ void Driver::withDeltaCpuWallTimer(
// opTimingMember upon destruction of the timer when withDeltaCpuWallTimer
// ends. The timer is created on the stack to avoid heap allocation
auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) {
auto elapsedSelfTime = processLazyTiming(*op, elapsedTime);
auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime);
op->stats().withWLock([&](auto& lockedStats) {
(lockedStats.*opTimingMember).add(elapsedSelfTime);
});
Expand Down
13 changes: 6 additions & 7 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,12 @@ class Driver : public std::enable_shared_from_this<Driver> {
TimingMemberPtr opTimingMember,
Func&& opFunction);

// Adjusts 'timing' by removing the lazy load wall and CPU times
// accrued since last time timing information was recorded for
// 'op'. The accrued lazy load times are credited to the source
// operator of 'this'. The per-operator runtimeStats for lazy load
// are left in place to reflect which operator triggered the load
// but these do not bias the op's timing.
CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing);
// Adjusts 'timing' by removing the lazy load wall time, CPU time, and input
// bytes accrued since last time timing information was recorded for 'op'. The
// accrued lazy load times are credited to the source operator of 'this'. The
// per-operator runtimeStats for lazy load are left in place to reflect which
// operator triggered the load but these do not bias the op's timing.
CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing);

inline void validateOperatorOutputResult(
const RowVectorPtr& result,
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct OperatorStats {
// Last recorded values for lazy loading times for loads triggered by 'this'.
int64_t lastLazyCpuNanos{0};
int64_t lastLazyWallNanos{0};
int64_t lastLazyInputBytes{0};

// Total null keys processed by the operator.
// Currently populated only by HashJoin/HashBuild.
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
{" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down Expand Up @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
{" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"},
{" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"},
Expand Down Expand Up @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3816,7 +3816,12 @@ TEST_F(TableScanTest, structLazy) {
.project({"cardinality(c2.c0)"})
.planNode();

assertQuery(op, {filePath}, "select c0 % 3 from tmp");
auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp");

// Ensure lazy stats are attributed to table scan.
auto stats = task->taskStats();
ASSERT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0);
ASSERT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0);
}

TEST_F(TableScanTest, interleaveLazyEager) {
Expand Down
16 changes: 11 additions & 5 deletions velox/vector/LazyVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@
#include "velox/vector/SelectivityVector.h"

namespace facebook::velox {

namespace {
void writeIOTiming(const CpuWallTiming& delta) {

void recordLazyIOStats(const CpuWallTiming& cpuDelta, uint64_t inputBytes) {
addThreadLocalRuntimeStat(
LazyVector::kWallNanos,
RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos));
RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kCpuNanos,
RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos));
RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kInputBytes,
RuntimeCounter(inputBytes, RuntimeCounter::Unit::kBytes));
}

} // namespace

void VectorLoader::load(
Expand All @@ -41,7 +45,9 @@ void VectorLoader::load(
vector_size_t resultSize,
VectorPtr* result) {
{
DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); });
DeltaCpuWallTimer lazyIoRecorder([&](auto& cpuDelta) {
recordLazyIOStats(cpuDelta, *result ? (*result)->estimateFlatSize() : 0);
});
loadInternal(rows, hook, resultSize, result);
}
if (hook) {
Expand Down
2 changes: 2 additions & 0 deletions velox/vector/LazyVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class LazyVector : public BaseVector {
public:
static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos";
static constexpr const char* kWallNanos = "dataSourceLazyWallNanos";
static constexpr const char* kInputBytes = "dataSourceLazyInputBytes";

LazyVector(
velox::memory::MemoryPool* pool,
TypePtr type,
Expand Down
6 changes: 4 additions & 2 deletions velox/vector/tests/LazyVectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) {
std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) {
return x.first < y.first;
});
ASSERT_EQ(stats.size(), 2);
ASSERT_EQ(stats.size(), 3);
ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos);
ASSERT_GE(stats[0].second.value, 0);
ASSERT_EQ(stats[1].first, LazyVector::kWallNanos);
ASSERT_EQ(stats[1].first, LazyVector::kInputBytes);
ASSERT_GE(stats[1].second.value, 0);
ASSERT_EQ(stats[2].first, LazyVector::kWallNanos);
ASSERT_GE(stats[2].second.value, 0);
}

0 comments on commit 59b440e

Please sign in to comment.