Skip to content

Commit

Permalink
Add tracer in TableWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Sep 1, 2024
1 parent ecebbbf commit 2363373
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 8 deletions.
4 changes: 4 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
const core::PlanNodeId& planNodeId,
const std::string& operatorType) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/futures/Future.h>
#include <folly/portability/SysSyscall.h>
Expand All @@ -29,6 +31,7 @@
#include "velox/core/PlanNode.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/Spiller.h"
#include "velox/exec/trace/QueryTraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -287,6 +290,7 @@ struct DriverCtx {
uint32_t _partitionId);

const core::QueryConfig& queryConfig() const;
const std::optional<trace::QueryTraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
Expand Down
35 changes: 35 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -101,6 +102,39 @@ void Operator::maybeSetReclaimer() {
Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this));
}

void Operator::maybeSetQueryDataWriter() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) {
return;
}

const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
operatorCtx_->driverCtx()->pipelineId,
operatorCtx_->driverCtx()->driverId);
trace::createTraceDirectory(opTraceDirPath);
queryDataWriter_ = std::make_unique<trace::QueryDataWriter>(
opTraceDirPath, memory::traceMemoryPool());
}

void Operator::traceInput(RowVectorPtr input) {
if (FOLLY_UNLIKELY(queryDataWriter_ != nullptr)) {
queryDataWriter_->write(input);
}
}

void Operator::finishTrace() {
if (queryDataWriter_ != nullptr) {
queryDataWriter_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Operator::translators() {
static std::vector<std::unique_ptr<PlanNodeTranslator>> translators;
Expand Down Expand Up @@ -150,6 +184,7 @@ void Operator::initialize() {
pool()->name());
initialized_ = true;
maybeSetReclaimer();
maybeSetQueryDataWriter();
}

// static
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/
#pragma once

#include "velox/exec/trace/QueryDataWriter.h"

#include <folly/Synchronized.h>
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/time/CpuWallTimer.h"
Expand Down Expand Up @@ -396,6 +399,7 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand All @@ -420,6 +424,12 @@ class Operator : public BaseRuntimeStatWriter {
/// build side is empty.
virtual bool isFinished() = 0;

/// Traces input batch of the operator.
virtual void traceInput(RowVectorPtr input);

/// Finishes tracing of the operator.
virtual void finishTrace();

/// Returns single-column dynamically generated filters to be pushed down to
/// upstream operators. Used to push down filters on join keys from broadcast
/// hash join into probe-side table scan. Can also be used to push down TopN
Expand Down Expand Up @@ -718,6 +728,10 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
void maybeSetQueryDataWriter();

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down Expand Up @@ -755,6 +769,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> queryDataWriter_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) {
if (input->size() == 0) {
return;
}

traceInput(input);
std::vector<VectorPtr> mappedChildren;
mappedChildren.reserve(inputMapping_.size());
for (auto i : inputMapping_) {
for (const auto i : inputMapping_) {
mappedChildren.emplace_back(input->childAt(i));
}

auto mappedInput = std::make_shared<RowVector>(
const auto mappedInput = std::make_shared<RowVector>(
input->pool(),
mappedType_,
input->nulls(),
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class Task : public std::enable_shared_from_this<Task> {
return pool_.get();
}

/// Returns QueryTraceConfig specified in the constructor.
const std::optional<trace::QueryTraceConfig>& queryTraceConfig() const {
return traceConfig_;
}

/// Returns ConsumerSupplier passed in the constructor.
ConsumerSupplier consumerSupplier() const {
return consumerSupplier_;
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/trace/QueryDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/exec/trace/QueryDataWriter.h"

#include <utility>
#include "velox/common/base/SpillStats.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand All @@ -23,10 +25,8 @@

namespace facebook::velox::exec::trace {

QueryDataWriter::QueryDataWriter(
const std::string& path,
memory::MemoryPool* pool)
: dirPath_(path),
QueryDataWriter::QueryDataWriter(std::string path, memory::MemoryPool* pool)
: dirPath_(std::move(path)),
fs_(filesystems::getFileSystem(dirPath_, nullptr)),
pool_(pool) {
dataFile_ = fs_->openFileForWrite(
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/trace/QueryDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace facebook::velox::exec::trace {
/// file.
class QueryDataWriter {
public:
explicit QueryDataWriter(const std::string& path, memory::MemoryPool* pool);
explicit QueryDataWriter(std::string path, memory::MemoryPool* pool);

/// Serializes rows and writes out each batch.
void write(const RowVectorPtr& rows);
Expand Down
42 changes: 42 additions & 0 deletions velox/exec/trace/test/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,48 @@ TEST_F(QueryTracerTest, traceDir) {
ASSERT_EQ(expectedDirs.count(dir), 1);
}
}

TEST_F(QueryTracerTest, traceTableWriter) {
const auto rowType = generateTypes(5);
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 3;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputRow(rowType));
}

const auto outputDir = TempDirectoryPath::create();
const auto dataPath =
fmt::format("{}/{}", outputDir->getPath(), "1/0/0/data");
const auto fs = filesystems::getFileSystem(dataPath, nullptr);
ASSERT_FALSE(fs->exists(dataPath));
const auto planNode = PlanBuilder()
.values(inputVectors)
.tableWrite(outputDir->getPath())
.planNode();
std::shared_ptr<Task> task;
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, outputDir->getPath())
.config(core::QueryConfig::kQueryTraceNodeIds, "1")
.copyResults(pool(), task);

const auto reader = trace::QueryDataReader(dataPath, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, inputVectors.size());
}

} // namespace facebook::velox::exec::test

// This main is needed for some tests on linux.
Expand Down

0 comments on commit 2363373

Please sign in to comment.