Skip to content

Commit

Permalink
Add tracer in TableWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Sep 24, 2024
1 parent cc46d81 commit 6cbfc3e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 9 deletions.
4 changes: 4 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
Expand All @@ -26,6 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/trace/QueryTraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -285,6 +287,8 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType);
Expand Down
37 changes: 37 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -104,6 +105,41 @@ void Operator::maybeSetReclaimer() {
Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this));
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) {
return;
}

const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
operatorCtx_->driverCtx()->pipelineId,
operatorCtx_->driverCtx()->driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(RowVectorPtr input) {
if (FOLLY_UNLIKELY(inputTracer_ != nullptr)) {
inputTracer_->write(input);
}
}

void Operator::finishTrace() {
if (inputTracer_ != nullptr) {
inputTracer_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Operator::translators() {
static std::vector<std::unique_ptr<PlanNodeTranslator>> translators;
Expand Down Expand Up @@ -153,6 +189,7 @@ void Operator::initialize() {
pool()->name());
initialized_ = true;
maybeSetReclaimer();
maybeSetTracer();
}

// static
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/
#pragma once

#include "velox/exec/trace/QueryDataWriter.h"

#include <folly/Synchronized.h>
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/time/CpuWallTimer.h"
Expand Down Expand Up @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand All @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter {
/// build side is empty.
virtual bool isFinished() = 0;

/// Traces input batch of the operator.
virtual void traceInput(RowVectorPtr input);

/// Finishes tracing of the operator.
virtual void finishTrace();

/// Returns single-column dynamically generated filters to be pushed down to
/// upstream operators. Used to push down filters on join keys from broadcast
/// hash join into probe-side table scan. Can also be used to push down TopN
Expand Down Expand Up @@ -721,6 +731,10 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down Expand Up @@ -758,6 +772,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) {
if (input->size() == 0) {
return;
}

traceInput(input);
std::vector<VectorPtr> mappedChildren;
mappedChildren.reserve(inputMapping_.size());
for (auto i : inputMapping_) {
for (const auto i : inputMapping_) {
mappedChildren.emplace_back(input->childAt(i));
}

auto mappedInput = std::make_shared<RowVector>(
const auto mappedInput = std::make_shared<RowVector>(
input->pool(),
mappedType_,
input->nulls(),
Expand Down
18 changes: 12 additions & 6 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2856,9 +2856,17 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
return std::nullopt;
}

const auto traceDir =
fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_);
const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
LOG(WARNING) << "Query trace is enabled, but no nodes will be traced.";
return trace::QueryTraceConfig(traceDir);
}
if (queryConfig.queryTraceMaxBytes() == 0) {
LOG(WARNING)
<< "Query trace is enabled, but no intput vectors will be traced.";
return trace::QueryTraceConfig(traceDir);
}

std::vector<std::string> nodes;
Expand All @@ -2873,7 +2881,7 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
};
return trace::QueryTraceConfig(
std::move(nodeSet),
queryConfig.queryTraceDir(),
traceDir,
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
}
Expand All @@ -2883,11 +2891,9 @@ void Task::maybeInitQueryTrace() {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
trace::createTraceDirectory(traceTaskDir);
trace::createTraceDirectory(traceConfig_->queryTraceDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceTaskDir, memory::traceMemoryPool());
traceConfig_->queryTraceDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class Task : public std::enable_shared_from_this<Task> {
return pool_.get();
}

/// Returns query trace config if specified.
const std::optional<trace::QueryTraceConfig>& queryTraceConfig() const {
return traceConfig_;
}

/// Returns ConsumerSupplier passed in the constructor.
ConsumerSupplier consumerSupplier() const {
return consumerSupplier_;
Expand Down
103 changes: 103 additions & 0 deletions velox/exec/trace/test/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,109 @@ TEST_F(QueryTracerTest, traceDir) {
ASSERT_EQ(expectedDirs.count(dir), 1);
}
}

TEST_F(QueryTracerTest, traceTableWriter) {
const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()});
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, false},

{"wrong id", 10UL << 30, 0, false},
{"test_cursor [123456]", 10UL << 30, numBatch, false},
{"test_cursor [123456]", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNode = PlanBuilder()
.values(inputVectors)
.tableWrite(outputDir->getPath())
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, "1")
.copyResults(pool(), task);

const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId());
const auto fs = filesystems::getFileSystem(metadataDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadta file should exist.
const auto traceMetaFile = fmt::format(
"{}/{}/{}",
traceRoot,
task->taskId(),
trace::QueryTraceTraits::kQueryMetaFileName);
ASSERT_TRUE(fs->exists(traceMetaFile));

const auto dataDir =
fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data");

// Query data tracing disabled.
if (testData.maxTracedBytes == 0) {
ASSERT_FALSE(fs->exists(dataDir));
continue;
}

ASSERT_EQ(fs->list(dataDir).size(), 2);
// Check data summaries.
const auto summaryFile = fs->openFileForRead(
fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName));
const auto summary = summaryFile->pread(0, summaryFile->size());
ASSERT_FALSE(summary.empty());
folly::dynamic obj = folly::parseJson(summary);
ASSERT_EQ(
obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(),
testData.limitExceeded);

const auto reader = trace::QueryDataReader(dataDir, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
} // namespace facebook::velox::exec::trace::test

// This main is needed for some tests on linux.
Expand Down

0 comments on commit 6cbfc3e

Please sign in to comment.