diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index bea69c0a352c1..96ea978fc6123 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -142,6 +142,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } +const std::optional& DriverCtx::traceConfig() const { + return task->queryTraceConfig(); +} + velox::memory::MemoryPool* DriverCtx::addOperatorPool( const core::PlanNodeId& planNodeId, const std::string& operatorType) { diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 9410dd7d82e59..567b8d85524a5 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once + #include #include #include @@ -29,6 +31,7 @@ #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" #include "velox/exec/Spiller.h" +#include "velox/exec/trace/QueryTraceConfig.h" namespace facebook::velox::exec { @@ -287,6 +290,7 @@ struct DriverCtx { uint32_t _partitionId); const core::QueryConfig& queryConfig() const; + const std::optional& traceConfig() const; velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 0798785a031d4..c8877efb35006 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -22,6 +22,7 @@ #include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" +#include "velox/exec/trace/QueryTraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -101,6 +102,39 @@ void Operator::maybeSetReclaimer() { Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this)); } +void Operator::maybeSetQueryDataWriter() { + const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!queryTraceConfig.has_value()) { + return; + } + + if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) { + return; + } + + const auto opTraceDirPath = fmt::format( + "{}/{}/{}/{}/data", + queryTraceConfig->queryTraceDir, + planNodeId(), + operatorCtx_->driverCtx()->pipelineId, + operatorCtx_->driverCtx()->driverId); + trace::createTraceDirectory(opTraceDirPath); + queryDataWriter_ = std::make_unique( + opTraceDirPath, memory::traceMemoryPool()); +} + +void Operator::traceInput(RowVectorPtr input) { + if (FOLLY_UNLIKELY(queryDataWriter_ != nullptr)) { + queryDataWriter_->write(input); + } +} + +void Operator::finishTrace() { + if (queryDataWriter_ != nullptr) { + queryDataWriter_->finish(); + } +} + std::vector>& Operator::translators() { static std::vector> translators; @@ -150,6 +184,7 @@ void Operator::initialize() { pool()->name()); initialized_ = true; maybeSetReclaimer(); + maybeSetQueryDataWriter(); } // static diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 051ce55c02c8d..21801dfede0d1 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -14,6 +14,9 @@ * limitations under the License. */ #pragma once + +#include "velox/exec/trace/QueryDataWriter.h" + #include #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/time/CpuWallTimer.h" @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; + finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter { /// build side is empty. virtual bool isFinished() = 0; + /// Traces input batch of the operator. + virtual void traceInput(RowVectorPtr input); + + /// Finishes tracing of the operator. + virtual void finishTrace(); + /// Returns single-column dynamically generated filters to be pushed down to /// upstream operators. Used to push down filters on join keys from broadcast /// hash join into probe-side table scan. Can also be used to push down TopN @@ -718,6 +728,10 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } + /// Invoked to setup query data writer for this operator if the associated + /// query plan node is configured to collect trace. + void maybeSetQueryDataWriter(); + /// Creates output vector from 'input_' and 'results' according to /// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to /// nullptr, the children of the output vector will be identical to their @@ -755,6 +769,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; + std::unique_ptr queryDataWriter_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 9bd199cfe9ffc..2ce07af95cd1a 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - + traceInput(input); std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); - for (auto i : inputMapping_) { + for (const auto i : inputMapping_) { mappedChildren.emplace_back(input->childAt(i)); } - auto mappedInput = std::make_shared( + const auto mappedInput = std::make_shared( input->pool(), mappedType_, input->nulls(), diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 4e24c08144636..3bf39007842ce 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -137,6 +137,11 @@ class Task : public std::enable_shared_from_this { return pool_.get(); } + /// Returns QueryTraceConfig specified in the constructor. + const std::optional& queryTraceConfig() const { + return traceConfig_; + } + /// Returns ConsumerSupplier passed in the constructor. ConsumerSupplier consumerSupplier() const { return consumerSupplier_; diff --git a/velox/exec/trace/QueryDataWriter.cpp b/velox/exec/trace/QueryDataWriter.cpp index 57dc4284cd10b..a5324ca684a9c 100644 --- a/velox/exec/trace/QueryDataWriter.cpp +++ b/velox/exec/trace/QueryDataWriter.cpp @@ -15,6 +15,8 @@ */ #include "velox/exec/trace/QueryDataWriter.h" + +#include #include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" @@ -23,10 +25,8 @@ namespace facebook::velox::exec::trace { -QueryDataWriter::QueryDataWriter( - const std::string& path, - memory::MemoryPool* pool) - : dirPath_(path), +QueryDataWriter::QueryDataWriter(std::string path, memory::MemoryPool* pool) + : dirPath_(std::move(path)), fs_(filesystems::getFileSystem(dirPath_, nullptr)), pool_(pool) { dataFile_ = fs_->openFileForWrite( diff --git a/velox/exec/trace/QueryDataWriter.h b/velox/exec/trace/QueryDataWriter.h index 435a0bccc00f9..031ecab50eba4 100644 --- a/velox/exec/trace/QueryDataWriter.h +++ b/velox/exec/trace/QueryDataWriter.h @@ -28,7 +28,7 @@ namespace facebook::velox::exec::trace { /// file. class QueryDataWriter { public: - explicit QueryDataWriter(const std::string& path, memory::MemoryPool* pool); + explicit QueryDataWriter(std::string path, memory::MemoryPool* pool); /// Serializes rows and writes out each batch. void write(const RowVectorPtr& rows); diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 171c7cea138a8..70dce4320d47a 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -361,6 +361,48 @@ TEST_F(QueryTracerTest, traceDir) { ASSERT_EQ(expectedDirs.count(dir), 1); } } + +TEST_F(QueryTracerTest, traceTableWriter) { + const auto rowType = generateTypes(5); + std::vector inputVectors; + constexpr auto numBatch = 3; + inputVectors.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + inputVectors.push_back(vectorFuzzer_.fuzzInputRow(rowType)); + } + + const auto outputDir = TempDirectoryPath::create(); + const auto dataPath = + fmt::format("{}/{}", outputDir->getPath(), "1/0/0/data"); + const auto fs = filesystems::getFileSystem(dataPath, nullptr); + ASSERT_FALSE(fs->exists(dataPath)); + const auto planNode = PlanBuilder() + .values(inputVectors) + .tableWrite(outputDir->getPath()) + .planNode(); + std::shared_ptr task; + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, outputDir->getPath()) + .config(core::QueryConfig::kQueryTraceNodeIds, "1") + .copyResults(pool(), task); + + const auto reader = trace::QueryDataReader(dataPath, pool()); + RowVectorPtr actual; + size_t numOutputVectors{0}; + while (reader.read(actual)) { + const auto expected = inputVectors[numOutputVectors]; + const auto size = actual->size(); + ASSERT_EQ(size, expected->size()); + for (auto i = 0; i < size; ++i) { + actual->compare(expected.get(), i, i, {.nullsFirst = true}); + } + ++numOutputVectors; + } + ASSERT_EQ(numOutputVectors, inputVectors.size()); +} + } // namespace facebook::velox::exec::test // This main is needed for some tests on linux.