Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly report lazy loaded inputBytes #11097

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/common/time/CpuWallTimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/time/CpuWallTimer.h"

namespace facebook::velox {

CpuWallTimer::CpuWallTimer(CpuWallTiming& timing) : timing_(timing) {
++timing_.count;
cpuTimeStart_ = process::threadCpuNanos();
Expand All @@ -29,4 +30,5 @@ CpuWallTimer::~CpuWallTimer() {
std::chrono::steady_clock::now() - wallTimeStart_);
timing_.wallNanos += duration.count();
}

} // namespace facebook::velox
34 changes: 22 additions & 12 deletions velox/common/time/CpuWallTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,14 @@ class CpuWallTimer {
CpuWallTiming& timing_;
};

// Keeps track of elapsed CPU and wall time from construction time.
// Composes delta CpuWallTiming upon destruction and passes it to the user
// callback, where it can be added to the user's CpuWallTiming using
// CpuWallTiming::add().
template <typename F>
class DeltaCpuWallTimer {
/// Keeps track of elapsed CPU and wall time from construction time.
class DeltaCpuWallTimeStopWatch {
public:
explicit DeltaCpuWallTimer(F&& func)
explicit DeltaCpuWallTimeStopWatch()
: wallTimeStart_(std::chrono::steady_clock::now()),
cpuTimeStart_(process::threadCpuNanos()),
func_(std::move(func)) {}
cpuTimeStart_(process::threadCpuNanos()) {}

~DeltaCpuWallTimer() {
CpuWallTiming elapsed() const {
// NOTE: End the cpu-time timing first, and then end the wall-time timing,
// so as to avoid the counter-intuitive phenomenon that the final calculated
// cpu-time is slightly larger than the wall-time.
Expand All @@ -78,15 +73,30 @@ class DeltaCpuWallTimer {
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now() - wallTimeStart_)
.count();
const CpuWallTiming deltaTiming{1, wallTimeDuration, cpuTimeDuration};
func_(deltaTiming);
return CpuWallTiming{1, wallTimeDuration, cpuTimeDuration};
}

private:
// NOTE: Put `wallTimeStart_` before `cpuTimeStart_`, so that wall-time starts
// counting earlier than cpu-time.
const std::chrono::steady_clock::time_point wallTimeStart_;
const uint64_t cpuTimeStart_;
};

/// Composes delta CpuWallTiming upon destruction and passes it to the user
/// callback, where it can be added to the user's CpuWallTiming using
/// CpuWallTiming::add().
template <typename F>
class DeltaCpuWallTimer {
public:
explicit DeltaCpuWallTimer(F&& func) : func_(std::move(func)) {}

~DeltaCpuWallTimer() {
func_(timer_.elapsed());
}

private:
DeltaCpuWallTimeStopWatch timer_;
F func_;
};

Expand Down
26 changes: 22 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,22 @@ size_t OpCallStatusRaw::callDuration() const {
: fmt::format("null::{}", operatorMethod);
}

CpuWallTiming Driver::processLazyTiming(
CpuWallTiming Driver::processLazyIoStats(
Operator& op,
const CpuWallTiming& timing) {
if (&op == operators_[0].get()) {
return timing;
}
auto lockStats = op.stats().wlock();

// Checks and tries to update cpu time from lazy loads.
auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos);
if (it == lockStats->runtimeStats.end()) {
// Return early if no lazy activity. Lazy CPU and wall times are recorded
// together, checking one is enough.
return timing;
}
int64_t cpu = it->second.sum;
const int64_t cpu = it->second.sum;
auto cpuDelta = std::max<int64_t>(0, cpu - lockStats->lastLazyCpuNanos);
if (cpuDelta == 0) {
// Return early if no change. Checking one counter is enough. If this did
Expand All @@ -421,15 +423,29 @@ CpuWallTiming Driver::processLazyTiming(
return timing;
}
lockStats->lastLazyCpuNanos = cpu;

// Checks and tries to update wall time from lazy loads.
int64_t wallDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kWallNanos);
if (it != lockStats->runtimeStats.end()) {
int64_t wall = it->second.sum;
const int64_t wall = it->second.sum;
wallDelta = std::max<int64_t>(0, wall - lockStats->lastLazyWallNanos);
if (wallDelta > 0) {
lockStats->lastLazyWallNanos = wall;
}
}

// Checks and tries to update input bytes from lazy loads.
int64_t inputBytesDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kInputBytes);
if (it != lockStats->runtimeStats.end()) {
const int64_t inputBytes = it->second.sum;
inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes;
if (inputBytesDelta > 0) {
lockStats->lastLazyInputBytes = inputBytes;
}
}

lockStats.unlock();
cpuDelta = std::min<int64_t>(cpuDelta, timing.cpuNanos);
wallDelta = std::min<int64_t>(wallDelta, timing.wallNanos);
Expand All @@ -439,6 +455,8 @@ CpuWallTiming Driver::processLazyTiming(
static_cast<uint64_t>(wallDelta),
static_cast<uint64_t>(cpuDelta),
});
lockStats->inputBytes += inputBytesDelta;
lockStats->outputBytes += inputBytesDelta;
return CpuWallTiming{
1,
timing.wallNanos - wallDelta,
Expand Down Expand Up @@ -1016,7 +1034,7 @@ void Driver::withDeltaCpuWallTimer(
// opTimingMember upon destruction of the timer when withDeltaCpuWallTimer
// ends. The timer is created on the stack to avoid heap allocation
auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) {
auto elapsedSelfTime = processLazyTiming(*op, elapsedTime);
auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime);
op->stats().withWLock([&](auto& lockedStats) {
(lockedStats.*opTimingMember).add(elapsedSelfTime);
});
Expand Down
13 changes: 6 additions & 7 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,12 @@ class Driver : public std::enable_shared_from_this<Driver> {
TimingMemberPtr opTimingMember,
Func&& opFunction);

// Adjusts 'timing' by removing the lazy load wall and CPU times
// accrued since last time timing information was recorded for
// 'op'. The accrued lazy load times are credited to the source
// operator of 'this'. The per-operator runtimeStats for lazy load
// are left in place to reflect which operator triggered the load
// but these do not bias the op's timing.
CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing);
// Adjusts 'timing' by removing the lazy load wall time, CPU time, and input
// bytes accrued since last time timing information was recorded for 'op'. The
// accrued lazy load times are credited to the source operator of 'this'. The
// per-operator runtimeStats for lazy load are left in place to reflect which
// operator triggered the load but these do not bias the op's timing.
CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing);

inline void validateOperatorOutputResult(
const RowVectorPtr& result,
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct OperatorStats {
// Last recorded values for lazy loading times for loads triggered by 'this'.
int64_t lastLazyCpuNanos{0};
int64_t lastLazyWallNanos{0};
int64_t lastLazyInputBytes{0};

// Total null keys processed by the operator.
// Currently populated only by HashJoin/HashBuild.
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
{" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down Expand Up @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
{" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"},
{" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"},
Expand Down Expand Up @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3815,7 +3815,12 @@ TEST_F(TableScanTest, structLazy) {
.project({"cardinality(c2.c0)"})
.planNode();

assertQuery(op, {filePath}, "select c0 % 3 from tmp");
auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp");

// Ensure lazy stats are attributed to table scan.
const auto stats = task->taskStats();
EXPECT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0);
EXPECT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0);
}

TEST_F(TableScanTest, interleaveLazyEager) {
Expand Down
41 changes: 31 additions & 10 deletions velox/vector/LazyVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@
#include "velox/vector/SelectivityVector.h"

namespace facebook::velox {

namespace {
void writeIOTiming(const CpuWallTiming& delta) {
addThreadLocalRuntimeStat(
LazyVector::kWallNanos,
RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kCpuNanos,
RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos));
}

// Convenience class to record cpu and wall time from construction, updating
// thread local stats at destruction, including input bytes of the vector passed
// as parameter.
class LazyIoStatsRecorder {
public:
LazyIoStatsRecorder(VectorPtr* vector) : vector_(vector) {}

~LazyIoStatsRecorder() {
auto cpuDelta = timer_.elapsed();
addThreadLocalRuntimeStat(
LazyVector::kWallNanos,
RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kCpuNanos,
RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos));

if (*vector_) {
addThreadLocalRuntimeStat(
LazyVector::kInputBytes,
RuntimeCounter(
(*vector_)->estimateFlatSize(), RuntimeCounter::Unit::kBytes));
}
}

private:
DeltaCpuWallTimeStopWatch timer_;
VectorPtr* vector_;
};

} // namespace

void VectorLoader::load(
Expand All @@ -41,7 +62,7 @@ void VectorLoader::load(
vector_size_t resultSize,
VectorPtr* result) {
{
DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); });
LazyIoStatsRecorder recorder(result);
loadInternal(rows, hook, resultSize, result);
}
if (hook) {
Expand Down
2 changes: 2 additions & 0 deletions velox/vector/LazyVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class LazyVector : public BaseVector {
public:
static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos";
static constexpr const char* kWallNanos = "dataSourceLazyWallNanos";
static constexpr const char* kInputBytes = "dataSourceLazyInputBytes";

LazyVector(
velox::memory::MemoryPool* pool,
TypePtr type,
Expand Down
6 changes: 4 additions & 2 deletions velox/vector/tests/LazyVectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) {
std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) {
return x.first < y.first;
});
ASSERT_EQ(stats.size(), 2);
ASSERT_EQ(stats.size(), 3);
ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos);
ASSERT_GE(stats[0].second.value, 0);
ASSERT_EQ(stats[1].first, LazyVector::kWallNanos);
ASSERT_EQ(stats[1].first, LazyVector::kInputBytes);
ASSERT_GE(stats[1].second.value, 0);
ASSERT_EQ(stats[2].first, LazyVector::kWallNanos);
ASSERT_GE(stats[2].second.value, 0);
}
Loading