Skip to content

Commit

Permalink
Add tracer in TableWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Sep 26, 2024
1 parent cc46d81 commit 36074cc
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 10 deletions.
4 changes: 4 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
Expand All @@ -26,6 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/trace/QueryTraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -285,6 +287,8 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType);
Expand Down
46 changes: 46 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -104,6 +105,50 @@ void Operator::maybeSetReclaimer() {
Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this));
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) {
return;
}

if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) {
return;
}

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
if (FOLLY_UNLIKELY(inputTracer_ != nullptr)) {
inputTracer_->write(input);
}
}

void Operator::finishTrace() {
if (inputTracer_ != nullptr) {
inputTracer_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Operator::translators() {
static std::vector<std::unique_ptr<PlanNodeTranslator>> translators;
Expand Down Expand Up @@ -153,6 +198,7 @@ void Operator::initialize() {
pool()->name());
initialized_ = true;
maybeSetReclaimer();
maybeSetTracer();
}

// static
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/
#pragma once

#include "velox/exec/trace/QueryDataWriter.h"

#include <folly/Synchronized.h>
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/time/CpuWallTimer.h"
Expand Down Expand Up @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand All @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter {
/// build side is empty.
virtual bool isFinished() = 0;

/// Traces input batch of the operator.
virtual void traceInput(const RowVectorPtr&);

/// Finishes tracing of the operator.
virtual void finishTrace();

/// Returns single-column dynamically generated filters to be pushed down to
/// upstream operators. Used to push down filters on join keys from broadcast
/// hash join into probe-side table scan. Can also be used to push down TopN
Expand Down Expand Up @@ -721,6 +731,10 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down Expand Up @@ -758,6 +772,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) {
if (input->size() == 0) {
return;
}

traceInput(input);
std::vector<VectorPtr> mappedChildren;
mappedChildren.reserve(inputMapping_.size());
for (auto i : inputMapping_) {
for (const auto i : inputMapping_) {
mappedChildren.emplace_back(input->childAt(i));
}

auto mappedInput = std::make_shared<RowVector>(
const auto mappedInput = std::make_shared<RowVector>(
input->pool(),
mappedType_,
input->nulls(),
Expand Down
16 changes: 9 additions & 7 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2856,24 +2856,28 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
return std::nullopt;
}

const auto traceDir =
fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_);
const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
LOG(INFO) << "Trace metadata for task: " << taskId_;
return trace::QueryTraceConfig(traceDir);
}

std::vector<std::string> nodes;
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes "
<< queryTraceNodes;

trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB =
[this](uint64_t bytes) {
return queryCtx_->updateTracedBytesAndCheckLimit(bytes);
};
return trace::QueryTraceConfig(
std::move(nodeSet),
queryConfig.queryTraceDir(),
traceDir,
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
}
Expand All @@ -2883,11 +2887,9 @@ void Task::maybeInitQueryTrace() {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
trace::createTraceDirectory(traceTaskDir);
trace::createTraceDirectory(traceConfig_->queryTraceDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceTaskDir, memory::traceMemoryPool());
traceConfig_->queryTraceDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class Task : public std::enable_shared_from_this<Task> {
return pool_.get();
}

/// Returns query trace config if specified.
const std::optional<trace::QueryTraceConfig>& queryTraceConfig() const {
return traceConfig_;
}

/// Returns ConsumerSupplier passed in the constructor.
ConsumerSupplier consumerSupplier() const {
return consumerSupplier_;
Expand Down
102 changes: 102 additions & 0 deletions velox/exec/trace/test/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,108 @@ TEST_F(QueryTracerTest, traceDir) {
ASSERT_EQ(expectedDirs.count(dir), 1);
}
}

TEST_F(QueryTracerTest, traceTableWriter) {
const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()});
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, false},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNode = PlanBuilder()
.values(inputVectors)
.tableWrite(outputDir->getPath())
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, "1")
.copyResults(pool(), task);

const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId());
const auto fs = filesystems::getFileSystem(metadataDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadta file should exist.
const auto traceMetaFile = fmt::format(
"{}/{}/{}",
traceRoot,
task->taskId(),
trace::QueryTraceTraits::kQueryMetaFileName);
ASSERT_TRUE(fs->exists(traceMetaFile));

const auto dataDir =
fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data");

// Query data tracing disabled.
if (testData.maxTracedBytes == 0) {
ASSERT_FALSE(fs->exists(dataDir));
continue;
}

ASSERT_EQ(fs->list(dataDir).size(), 2);
// Check data summaries.
const auto summaryFile = fs->openFileForRead(
fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName));
const auto summary = summaryFile->pread(0, summaryFile->size());
ASSERT_FALSE(summary.empty());
folly::dynamic obj = folly::parseJson(summary);
ASSERT_EQ(
obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(),
testData.limitExceeded);

const auto reader = trace::QueryDataReader(dataDir, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
} // namespace facebook::velox::exec::trace::test

// This main is needed for some tests on linux.
Expand Down

0 comments on commit 36074cc

Please sign in to comment.