diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 307ba0d323708..b818e17d2c008 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -95,6 +95,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } +const std::optional& DriverCtx::traceConfig() const { + return task->queryTraceConfig(); +} + velox::memory::MemoryPool* DriverCtx::addOperatorPool( const core::PlanNodeId& planNodeId, const std::string& operatorType) { diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7551b2b304d71..6563f204b17f3 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once #include @@ -26,6 +27,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" +#include "velox/exec/trace/QueryTraceConfig.h" namespace facebook::velox::exec { @@ -285,6 +287,8 @@ struct DriverCtx { const core::QueryConfig& queryConfig() const; + const std::optional& traceConfig() const; + velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, const std::string& operatorType); diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index bb78e9bef57ca..be210a609bd52 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -22,6 +22,7 @@ #include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" +#include "velox/exec/trace/QueryTraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -104,6 +105,50 @@ void Operator::maybeSetReclaimer() { Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this)); } +void Operator::maybeSetTracer() { + const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!queryTraceConfig.has_value()) { + return; + } + + if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) { + return; + } + + if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) { + return; + } + + const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; + const auto driverId = operatorCtx_->driverCtx()->driverId; + LOG(INFO) << "Query data trace enabled in operator: " << operatorType() + << ", operator id: " << operatorId() << ", pipeline: " << pipelineId + << ", driver: " << driverId; + const auto opTraceDirPath = fmt::format( + "{}/{}/{}/{}/data", + queryTraceConfig->queryTraceDir, + planNodeId(), + pipelineId, + driverId); + trace::createTraceDirectory(opTraceDirPath); + inputTracer_ = std::make_unique( + opTraceDirPath, + memory::traceMemoryPool(), + queryTraceConfig->updateAndCheckTraceLimitCB); +} + +void Operator::traceInput(const RowVectorPtr& input) { + if (FOLLY_UNLIKELY(inputTracer_ != nullptr)) { + inputTracer_->write(input); + } +} + +void Operator::finishTrace() { + if (inputTracer_ != nullptr) { + inputTracer_->finish(); + } +} + std::vector>& Operator::translators() { static std::vector> translators; @@ -153,6 +198,7 @@ void Operator::initialize() { pool()->name()); initialized_ = true; maybeSetReclaimer(); + maybeSetTracer(); } // static diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 46010bfe878f4..4f027a304fcdb 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -14,6 +14,9 @@ * limitations under the License. */ #pragma once + +#include "velox/exec/trace/QueryDataWriter.h" + #include #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/time/CpuWallTimer.h" @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; + finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter { /// build side is empty. virtual bool isFinished() = 0; + /// Traces input batch of the operator. + virtual void traceInput(const RowVectorPtr&); + + /// Finishes tracing of the operator. + virtual void finishTrace(); + /// Returns single-column dynamically generated filters to be pushed down to /// upstream operators. Used to push down filters on join keys from broadcast /// hash join into probe-side table scan. Can also be used to push down TopN @@ -721,6 +731,10 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } + /// Invoked to setup query data writer for this operator if the associated + /// query plan node is configured to collect trace. + void maybeSetTracer(); + /// Creates output vector from 'input_' and 'results' according to /// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to /// nullptr, the children of the output vector will be identical to their @@ -758,6 +772,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; + std::unique_ptr inputTracer_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 9bd199cfe9ffc..2ce07af95cd1a 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - + traceInput(input); std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); - for (auto i : inputMapping_) { + for (const auto i : inputMapping_) { mappedChildren.emplace_back(input->childAt(i)); } - auto mappedInput = std::make_shared( + const auto mappedInput = std::make_shared( input->pool(), mappedType_, input->nulls(), diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 66083ceb5d3ed..e679386f36842 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2856,16 +2856,19 @@ std::optional Task::maybeMakeTraceConfig() const { return std::nullopt; } + const auto traceDir = + fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_); const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); if (queryTraceNodes.empty()) { - return trace::QueryTraceConfig(queryConfig.queryTraceDir()); + LOG(INFO) << "Query metadata trace enabled for task: " << taskId_; + return trace::QueryTraceConfig(traceDir); } std::vector nodes; folly::split(',', queryTraceNodes, nodes); std::unordered_set nodeSet(nodes.begin(), nodes.end()); VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); - LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + LOG(INFO) << "Trace data for task " << " with plan nodes " << queryTraceNodes; trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = [this](uint64_t bytes) { @@ -2873,7 +2876,7 @@ std::optional Task::maybeMakeTraceConfig() const { }; return trace::QueryTraceConfig( std::move(nodeSet), - queryConfig.queryTraceDir(), + traceDir, std::move(updateAndCheckTraceLimitCB), queryConfig.queryTraceTaskRegExp()); } @@ -2883,11 +2886,9 @@ void Task::maybeInitQueryTrace() { return; } - const auto traceTaskDir = - fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); - trace::createTraceDirectory(traceTaskDir); + trace::createTraceDirectory(traceConfig_->queryTraceDir); const auto queryMetadatWriter = std::make_unique( - traceTaskDir, memory::traceMemoryPool()); + traceConfig_->queryTraceDir, memory::traceMemoryPool()); queryMetadatWriter->write(queryCtx_, planFragment_.planNode); } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index b237f42202319..2f63fa4bf3d64 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -138,6 +138,11 @@ class Task : public std::enable_shared_from_this { return pool_.get(); } + /// Returns query trace config if specified. + const std::optional& queryTraceConfig() const { + return traceConfig_; + } + /// Returns ConsumerSupplier passed in the constructor. ConsumerSupplier consumerSupplier() const { return consumerSupplier_; diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 8e8a2a6405b8a..5c3122aefc066 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -415,6 +415,108 @@ TEST_F(QueryTracerTest, traceDir) { ASSERT_EQ(expectedDirs.count(dir), 1); } } + +TEST_F(QueryTracerTest, traceTableWriter) { + const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); + std::vector inputVectors; + constexpr auto numBatch = 5; + inputVectors.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + } + + struct { + std::string taskRegExpr; + uint64_t maxTracedBytes; + uint8_t numTracedBatches; + bool limitExceeded; + + std::string debugString() const { + return fmt::format( + "taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}", + taskRegExpr, + maxTracedBytes, + numTracedBatches, + limitExceeded); + } + } testSettings[]{ + {".*", 10UL << 30, numBatch, false}, + {".*", 0, numBatch, false}, + {"wrong id", 10UL << 30, 0, false}, + {"test_cursor \\d+", 10UL << 30, numBatch, false}, + {"test_cursor \\d+", 800, 2, true}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto outputDir = TempDirectoryPath::create(); + const auto planNode = PlanBuilder() + .values(inputVectors) + .tableWrite(outputDir->getPath()) + .planNode(); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, "1") + .copyResults(pool(), task); + + const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId()); + const auto fs = filesystems::getFileSystem(metadataDir, nullptr); + + if (testData.taskRegExpr == "wrong id") { + ASSERT_FALSE(fs->exists(traceRoot)); + continue; + } + + // Query metadta file should exist. + const auto traceMetaFile = fmt::format( + "{}/{}/{}", + traceRoot, + task->taskId(), + trace::QueryTraceTraits::kQueryMetaFileName); + ASSERT_TRUE(fs->exists(traceMetaFile)); + + const auto dataDir = + fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data"); + + // Query data tracing disabled. + if (testData.maxTracedBytes == 0) { + ASSERT_FALSE(fs->exists(dataDir)); + continue; + } + + ASSERT_EQ(fs->list(dataDir).size(), 2); + // Check data summaries. + const auto summaryFile = fs->openFileForRead( + fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName)); + const auto summary = summaryFile->pread(0, summaryFile->size()); + ASSERT_FALSE(summary.empty()); + folly::dynamic obj = folly::parseJson(summary); + ASSERT_EQ( + obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), + testData.limitExceeded); + + const auto reader = trace::QueryDataReader(dataDir, pool()); + RowVectorPtr actual; + size_t numOutputVectors{0}; + while (reader.read(actual)) { + const auto expected = inputVectors[numOutputVectors]; + const auto size = actual->size(); + ASSERT_EQ(size, expected->size()); + for (auto i = 0; i < size; ++i) { + actual->compare(expected.get(), i, i, {.nullsFirst = true}); + } + ++numOutputVectors; + } + ASSERT_EQ(numOutputVectors, testData.numTracedBatches); + } +} } // namespace facebook::velox::exec::trace::test // This main is needed for some tests on linux.