Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trace support for TableWriter #10910

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
Expand All @@ -26,6 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/trace/QueryTraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -285,6 +287,8 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;
duanmeng marked this conversation as resolved.
Show resolved Hide resolved

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType);
Expand Down
46 changes: 46 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -104,6 +105,50 @@ void Operator::maybeSetReclaimer() {
Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this));
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) {
return;
}

if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) {
return;
}

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
if (FOLLY_UNLIKELY(inputTracer_ != nullptr)) {
inputTracer_->write(input);
}
}

void Operator::finishTrace() {
if (inputTracer_ != nullptr) {
inputTracer_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Operator::translators() {
static std::vector<std::unique_ptr<PlanNodeTranslator>> translators;
Expand Down Expand Up @@ -153,6 +198,7 @@ void Operator::initialize() {
pool()->name());
initialized_ = true;
maybeSetReclaimer();
maybeSetTracer();
}

// static
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/
#pragma once

#include "velox/exec/trace/QueryDataWriter.h"

#include <folly/Synchronized.h>
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/time/CpuWallTimer.h"
Expand Down Expand Up @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand All @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter {
/// build side is empty.
virtual bool isFinished() = 0;

/// Traces input batch of the operator.
virtual void traceInput(const RowVectorPtr&);

/// Finishes tracing of the operator.
virtual void finishTrace();

/// Returns single-column dynamically generated filters to be pushed down to
/// upstream operators. Used to push down filters on join keys from broadcast
/// hash join into probe-side table scan. Can also be used to push down TopN
Expand Down Expand Up @@ -721,6 +731,10 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down Expand Up @@ -758,6 +772,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) {
if (input->size() == 0) {
return;
}

traceInput(input);
std::vector<VectorPtr> mappedChildren;
mappedChildren.reserve(inputMapping_.size());
for (auto i : inputMapping_) {
for (const auto i : inputMapping_) {
mappedChildren.emplace_back(input->childAt(i));
}

auto mappedInput = std::make_shared<RowVector>(
const auto mappedInput = std::make_shared<RowVector>(
input->pool(),
mappedType_,
input->nulls(),
Expand Down
16 changes: 9 additions & 7 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2856,24 +2856,28 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
return std::nullopt;
}

const auto traceDir =
fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_);
const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
LOG(INFO) << "Trace metadata for task: " << taskId_;
return trace::QueryTraceConfig(traceDir);
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
}

std::vector<std::string> nodes;
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes "
<< queryTraceNodes;

trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB =
[this](uint64_t bytes) {
return queryCtx_->updateTracedBytesAndCheckLimit(bytes);
};
return trace::QueryTraceConfig(
std::move(nodeSet),
queryConfig.queryTraceDir(),
traceDir,
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
}
Expand All @@ -2883,11 +2887,9 @@ void Task::maybeInitQueryTrace() {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
trace::createTraceDirectory(traceTaskDir);
trace::createTraceDirectory(traceConfig_->queryTraceDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceTaskDir, memory::traceMemoryPool());
traceConfig_->queryTraceDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class Task : public std::enable_shared_from_this<Task> {
return pool_.get();
}

/// Returns query trace config if specified.
const std::optional<trace::QueryTraceConfig>& queryTraceConfig() const {
return traceConfig_;
}

/// Returns ConsumerSupplier passed in the constructor.
ConsumerSupplier consumerSupplier() const {
return consumerSupplier_;
Expand Down
102 changes: 102 additions & 0 deletions velox/exec/trace/test/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,108 @@ TEST_F(QueryTracerTest, traceDir) {
ASSERT_EQ(expectedDirs.count(dir), 1);
}
}

TEST_F(QueryTracerTest, traceTableWriter) {
const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()});
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, false},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNode = PlanBuilder()
.values(inputVectors)
.tableWrite(outputDir->getPath())
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, "1")
.copyResults(pool(), task);

const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId());
const auto fs = filesystems::getFileSystem(metadataDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadta file should exist.
const auto traceMetaFile = fmt::format(
"{}/{}/{}",
traceRoot,
task->taskId(),
trace::QueryTraceTraits::kQueryMetaFileName);
ASSERT_TRUE(fs->exists(traceMetaFile));

const auto dataDir =
fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data");

// Query data tracing disabled.
if (testData.maxTracedBytes == 0) {
ASSERT_FALSE(fs->exists(dataDir));
continue;
}

ASSERT_EQ(fs->list(dataDir).size(), 2);
// Check data summaries.
const auto summaryFile = fs->openFileForRead(
fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName));
const auto summary = summaryFile->pread(0, summaryFile->size());
ASSERT_FALSE(summary.empty());
folly::dynamic obj = folly::parseJson(summary);
ASSERT_EQ(
obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(),
testData.limitExceeded);

const auto reader = trace::QueryDataReader(dataDir, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
} // namespace facebook::velox::exec::trace::test

// This main is needed for some tests on linux.
Expand Down
Loading