Skip to content

Commit

Permalink
Add trace replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Sep 18, 2024
1 parent 079ed31 commit 0b12aec
Show file tree
Hide file tree
Showing 24 changed files with 656 additions and 32 deletions.
4 changes: 4 additions & 0 deletions velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ endif()
if(${VELOX_ENABLE_SUBSTRAIT})
add_subdirectory(substrait)
endif()

if(${VELOX_BUILD_TESTING})
add_subdirectory(tool)
endif()
17 changes: 17 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,13 @@ class QueryConfig {
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

/// The max trace bytes limit, if it is zero, then there is no limit.
static constexpr const char* kQueryTraceMaxBytes = "query_trace_max_bytes";

/// The regexp of traced task id.
static constexpr const char* kQueryTraceTaskRegExp =
"query_trace_task_reg_exp";

/// Disable optimization in expression evaluation to peel common dictionary
/// layer from inputs.
static constexpr const char* kDebugDisableExpressionWithPeeling =
Expand Down Expand Up @@ -689,6 +696,16 @@ class QueryConfig {
return get<std::string>(kQueryTraceNodeIds, "");
}

uint64_t queryTraceMaxBytes() const {
// The default query trace bytes, 0 by default.
return get<uint64_t>(kQueryTraceMaxBytes, 0);
}

std::string queryTraceTaskRegExp() const {
// The default query trace task regexp, empty by default.
return get<std::string>(kQueryTraceTaskRegExp, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
16 changes: 16 additions & 0 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) {
}
}

bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (queryConfig_.queryTraceMaxBytes() == 0) {
return false;
}

if (numTracedBytes_.fetch_add(bytes) + bytes <
queryConfig_.queryTraceMaxBytes()) {
return false;
}

numTracedBytes_.fetch_sub(bytes);
LOG(WARNING) << "Query exceeded trace limit of "
<< succinctBytes(queryConfig_.queryTraceMaxBytes());
return true;
}

std::unique_ptr<memory::MemoryReclaimer> QueryCtx::MemoryReclaimer::create(
QueryCtx* queryCtx,
memory::MemoryPool* pool) {
Expand Down
9 changes: 7 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
/// the memory arbiration finishes.
bool checkUnderArbitration(ContinueFuture* future);

/// Updates the aggregated spill bytes of this query, and and throws if
/// exceeds the max spill bytes limit.
/// Updates the aggregated spill bytes of this query, and throws if exceeds
/// the max spill bytes limit.
void updateSpilledBytesAndCheckLimit(uint64_t bytes);

/// Updates the aggregated trace bytes of this query, and return true if
/// exceeds the max query trace bytes limit.
bool updateTracedBytesAndCheckLimit(uint64_t bytes);

void testingOverrideMemoryPool(std::shared_ptr<memory::MemoryPool> pool) {
pool_ = std::move(pool);
}
Expand Down Expand Up @@ -216,6 +220,7 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
std::shared_ptr<memory::MemoryPool> pool_;
QueryConfig queryConfig_;
std::atomic<uint64_t> numSpilledBytes_{0};
std::atomic<uint64_t> numTracedBytes_{0};

mutable std::mutex mutex_;
// Indicates if this query is under memory arbitration or not.
Expand Down
8 changes: 8 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,11 @@ Tracing
-
- A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the
query metadata which includes the query plan and configs etc.
* - query_trace_task_reg_exp
- string
-
- The regexp of traced task id.
* - query_trace_max_bytes
- integer
- 0
- The max trace bytes limit, if it is zero, then there is no limit.
1 change: 1 addition & 0 deletions velox/docs/develop/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ Debugging Tools
debugging/print-expr-with-stats
debugging/vector-saver
debugging/metrics
debugging/tracing.rst
146 changes: 146 additions & 0 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
=======
Tracing
=======


Background
----------

The query trace tool helps analyze and debug query performance and correctness
issues. It helps prevent interference from the test noises in a production
environment (such as storage, network etc) by allowing replay of a part of the
query plan and data set in an isolated environment such as a local machine.
This is much more efficient for query performance debugging as we don't have to
replay the whole query in a production environment.

How Tracing Tool Works
----------------------

The tracing process consists of two phases: tracing and replaying.

**Tracing Phase**

- When the query starts, the task records the metadata including query plan fragment,
query configuration, and connector properties.
- During the query running, each traced operator records the input vectors and saves
in the specified storage location.
- The metadata are serialized using json format and operator data inputs are serialized
using `Presto serializer <https://prestodb.io/docs/current/develop/serialized-page.html>`_.

**Replaying Phase**

- Read and deserialize the recorded query plan in tracing phase, extract the target plan
node, and assemble a plan fragment with a customized source and sink nodes. The source
node reads the input from the serialized operator inputs on storage and sink operator
prints or logs out the execution stats.
- Build a query/task with the assembled plan fragment in step 1.
- Apply the recorded query configuration and connector properties to replay the query/task
with the same input and configuration setup as in production.

**NOTE**: the presto serialization might lose the input vector encoding such as lazy vector
and nested dictionary encoding etc which affects the operator’s execution. So it might not
always be the exactly the same as in production.

.. image:: ../images/trace-arch.png
:width: 600
:align: center

Tracing Framework
-----------------

The tracing framework consists of three components:

1. **Query Trace Writer**: metadata writer and the data writer.
2. **Query Trace Reader**: metadata reader and the data reader.
3. **Query Trace Tool**: display query summaries or replay the
execution of the target operator.

Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

- Query configurations and connector properties are specified by the user per query.
They can be serialized as JSON map objects (key-value pairs).
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
^^^^^^^^^^^^^^^^

- Create tracing directories.
- Get query summaries.
- Provide utilities to extract the target plan node, and assemble a plan fragment with
customized trace source and sink node with it.

Query Trace Tool
----------------

The query trace tool leverages the trace reader to display query summaries and replay the
execution of specific operators (TBD).

Tracing tools Usage
^^^^^^^^^^^^^^^^^^^

.. code-block:: c++

query_trace_tool --root $root_dir --summary --pretty


It would show something as the follows

.. code-block:: c++

++++++Query trace summary++++++
Number of tasks: 1
Task ids: task-1
++++++Query configs and plan:++++++
{
"planNode":{
"filter":{},
"outputType":{...},
"nullAware":false,
"sources":[{...}, {...}],
"leftKeys":[],
"joinType":"INNER",
"id":"5",
"name":"HashJoinNode"
},
"connectorProperties":{...},
"queryConfig":{...}
}


Here is a full list of supported command line arguments.

* ``--usage``: Show the usage.
* ``--root``: Root dir of the query tracing.
* ``--summary``: Show the summary of the tracing including number of tasks and task ids.
It also print the query metadata including query configs, connectors properties, and query plan in JSON format.
* ``--short_summary``: Only show number of tasks and task ids.
* ``--pretty``: Show the summary of the tracing in pretty JSON.
* ``--task_id``: Specify the target task id, if empty, show the summary of all the traced query tasks.


Future Work
-----------

https://github.com/facebookincubator/velox/issues/9668
Binary file added velox/docs/develop/images/trace-arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 17 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,14 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

VELOX_USER_CHECK(
!queryConfig.queryTraceTaskRegExp().empty(),
"Query trace enabled but the trace task regexp is not set");

if (!RE2::FullMatch(taskId_, queryConfig.queryTraceTaskRegExp())) {
return std::nullopt;
}

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
Expand All @@ -2857,8 +2865,16 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;

trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB =
[this](uint64_t bytes) {
return queryCtx_->updateTracedBytesAndCheckLimit(bytes);
};
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
std::move(nodeSet),
queryConfig.queryTraceDir(),
updateAndCheckTraceLimitCB,
queryConfig.queryTraceTaskRegExp());
}

void Task::maybeInitQueryTrace() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ velox_link_libraries(
velox_presto_serializer)

velox_add_library(velox_query_trace_retrieve QueryDataReader.cpp
QueryMetadataReader.cpp)
QueryMetadataReader.cpp QueryTraceUtil.cpp)

velox_link_libraries(
velox_query_trace_retrieve
Expand Down
33 changes: 26 additions & 7 deletions velox/exec/trace/QueryDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/exec/trace/QueryDataWriter.h"

#include <utility>
#include "velox/common/base/SpillStats.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand All @@ -24,17 +26,23 @@
namespace facebook::velox::exec::trace {

QueryDataWriter::QueryDataWriter(
const std::string& path,
memory::MemoryPool* pool)
: dirPath_(path),
std::string path,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB)
: dirPath_(std::move(path)),
fs_(filesystems::getFileSystem(dirPath_, nullptr)),
pool_(pool) {
pool_(pool),
updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) {
dataFile_ = fs_->openFileForWrite(
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName));
VELOX_CHECK_NOT_NULL(dataFile_);
}

void QueryDataWriter::write(const RowVectorPtr& rows) {
if (FOLLY_UNLIKELY(finished_)) {
return;
}

if (batch_ == nullptr) {
batch_ = std::make_unique<VectorStreamGroup>(pool_);
batch_->createStreamTree(
Expand All @@ -51,24 +59,35 @@ void QueryDataWriter::write(const RowVectorPtr& rows) {
batch_->flush(&out);
batch_->clear();
auto iobuf = out.getIOBuf();
if (FOLLY_UNLIKELY(
updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()))) {
finish(true);
return;
}
dataFile_->append(std::move(iobuf));
}

void QueryDataWriter::finish() {
void QueryDataWriter::finish(bool limitExceeded) {
if (finished_) {
return;
}

VELOX_CHECK_NOT_NULL(
dataFile_, "The query data writer has already been finished");
dataFile_->close();
dataFile_.reset();
batch_.reset();
writeSummary();
writeSummary(limitExceeded);
finished_ = true;
}

void QueryDataWriter::writeSummary() const {
void QueryDataWriter::writeSummary(bool limitExceeded) const {
const auto summaryFilePath =
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize();
obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded;
file->append(folly::toJson(obj));
file->close();
}
Expand Down
Loading

0 comments on commit 0b12aec

Please sign in to comment.