Skip to content

Commit

Permalink
Add reference query runner context to window fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Jun 2, 2024
1 parent 91383db commit 8b23836
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 21 deletions.
5 changes: 4 additions & 1 deletion velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,14 @@ std::unique_ptr<ReferenceQueryRunner> setupReferenceQueryRunner(
LOG(INFO) << "Using DuckDB as the reference DB.";
return duckQueryRunner;
} else {
return std::make_unique<PrestoQueryRunner>(
auto prestoQueryRunner = std::make_unique<PrestoQueryRunner>(
prestoUrl,
runnerName,
static_cast<std::chrono::milliseconds>(reqTimeoutMs));
prestoQueryRunner->queryRunnerContext_ =
std::make_shared<QueryRunnerContext>();
LOG(INFO) << "Using Presto as the reference DB.";
return prestoQueryRunner;
}
}

Expand Down
50 changes: 35 additions & 15 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,10 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return sql.str();
}

namespace {

void appendWindowFrame(
void PrestoQueryRunner::appendWindowFrame(
const core::WindowNode::Frame& frame,
std::stringstream& sql) {
// TODO: Add support for k Range Frames by retrieving the original range bound
// from WindowNode.
std::stringstream& sql,
const core::PlanNodeId& planNodeId) {
switch (frame.type) {
case core::WindowNode::WindowType::kRange:
sql << " RANGE";
Expand All @@ -421,9 +418,10 @@ void appendWindowFrame(
}
sql << " BETWEEN";

auto appendBound = [&sql](
auto appendBound = [&sql, &frame, &planNodeId, this](
const core::WindowNode::BoundType& bound,
const core::TypedExprPtr& value) {
const core::TypedExprPtr& value,
const bool isStartBound) {
switch (bound) {
case core::WindowNode::BoundType::kUnboundedPreceding:
sql << " UNBOUNDED PRECEDING";
Expand All @@ -435,23 +433,45 @@ void appendWindowFrame(
sql << " CURRENT ROW";
break;
case core::WindowNode::BoundType::kPreceding:
sql << " " << value->toString() << " PRECEDING";
if (frame.type == core::WindowNode::WindowType::kRange) {
if (isStartBound) {
sql << " "
<< queryRunnerContext_->windowFrames_.at(planNodeId)[0].first
<< " PRECEDING";
} else {
sql << " "
<< queryRunnerContext_->windowFrames_.at(planNodeId)[0].second
<< " PRECEDING";
}
} else {
sql << " " << value->toString() << " PRECEDING";
}
break;
case core::WindowNode::BoundType::kFollowing:
sql << " " << value->toString() << " FOLLOWING";
if (frame.type == core::WindowNode::WindowType::kRange) {
if (isStartBound) {
sql << " "
<< queryRunnerContext_->windowFrames_.at(planNodeId)[0].first
<< " FOLLOWING";
} else {
sql << " "
<< queryRunnerContext_->windowFrames_.at(planNodeId)[0].second
<< " FOLLOWING";
}
} else {
sql << " " << value->toString() << " FOLLOWING";
}
break;
default:
VELOX_UNREACHABLE();
}
};

appendBound(frame.startType, frame.startValue);
appendBound(frame.startType, frame.startValue, true);
sql << " AND";
appendBound(frame.endType, frame.endValue);
appendBound(frame.endType, frame.endValue, false);
}

} // namespace

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::WindowNode>& windowNode) {
if (!isSupportedDwrfType(windowNode->sources()[0]->outputType())) {
Expand Down Expand Up @@ -496,7 +516,7 @@ std::optional<std::string> PrestoQueryRunner::toSql(
}
}

appendWindowFrame(functions[i].frame, sql);
appendWindowFrame(functions[i].frame, sql, windowNode->id());
sql << ")";
}

Expand Down
5 changes: 5 additions & 0 deletions velox/exec/fuzzer/PrestoQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {

std::string fetchNext(const std::string& nextUri);

void appendWindowFrame(
const core::WindowNode::Frame& frame,
std::stringstream& sql,
const core::PlanNodeId& planNodeId);

const std::string coordinatorUri_;
const std::string user_;
const std::chrono::milliseconds timeout_;
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/fuzzer/ReferenceQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

namespace facebook::velox::exec::test {

class QueryRunnerContext {
public:
std::unordered_map<
core::PlanNodeId,
std::vector<std::pair<std::string, std::string>>>
windowFrames_;
};

/// Query runner that uses reference database, i.e. DuckDB, Presto, Spark.
class ReferenceQueryRunner {
public:
Expand Down Expand Up @@ -50,6 +58,8 @@ class ReferenceQueryRunner {
const RowTypePtr& resultType) {
VELOX_UNSUPPORTED();
}

std::shared_ptr<QueryRunnerContext> queryRunnerContext_;
};

} // namespace facebook::velox::exec::test
27 changes: 23 additions & 4 deletions velox/exec/fuzzer/WindowFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ T WindowFuzzer::genOffsetAtIdx(
}

template <typename T>
void WindowFuzzer::addKRangeFrameColumnToInput(
std::optional<std::string> WindowFuzzer::addKRangeFrameColumnToInput(
std::vector<RowVectorPtr>& input,
core::WindowNode::BoundType frameBoundType,
std::string& columnName,
SortingKeyAndOrder& orderByKey) {
T offsetValue;
if constexpr (!(std::is_same_v<T, StringView> ||
std::is_same_v<T, Timestamp> ||
std::is_same_v<T, UnknownValue>)) {
Expand All @@ -146,7 +147,7 @@ void WindowFuzzer::addKRangeFrameColumnToInput(

auto type = CppToType<T>::create();
VectorPtr fuzzOffset = vectorFuzzer_.fuzzConstant(type, 1);
T offsetValue = fuzzOffset->as<ConstantVector<T>>()->valueAt(0);
offsetValue = fuzzOffset->as<ConstantVector<T>>()->valueAt(0);

BufferPtr values = AlignedBuffer::allocate<T>(size, pool_.get());
auto valuesPtr = values->asMutableRange<T>();
Expand All @@ -172,11 +173,14 @@ void WindowFuzzer::addKRangeFrameColumnToInput(
newChildren.push_back(column);
input[j] = vectorMaker.rowVector(newNames, newChildren);
}
return std::to_string(offsetValue);
} else {
VELOX_USER_FAIL(
"Invalid type {} for orderBy column",
CppToType<T>::create()->toString());
}

return std::nullopt;
}

std::string WindowFuzzer::generateFrameClause(
Expand Down Expand Up @@ -290,16 +294,27 @@ void WindowFuzzer::addOffsetColumnsToInput(

using TCpp = typename TypeTraits<TKind>::NativeType;
std::string colName;
std::pair<std::string, std::string> prestoFrame;
if (isFrameStartKBound) {
colName = "k0";
addKRangeFrameColumnToInput<TCpp>(
auto startFrame = addKRangeFrameColumnToInput<TCpp>(
input, startBoundType, colName, orderByKey);
VELOX_USER_CHECK(startFrame.has_value());
prestoFrame.first = startFrame.value();
}
if (isFrameEndKBound) {
colName = "k1";
addKRangeFrameColumnToInput<TCpp>(
auto endFrame = addKRangeFrameColumnToInput<TCpp>(
input, startBoundType, colName, orderByKey);
VELOX_USER_CHECK(endFrame.has_value());
prestoFrame.second = endFrame.value();
}

// Currently only one window function is called by the fuzzer in a window
// plan node.
prestoFrames_.clear();
prestoFrames_.reserve(1);
prestoFrames_.push_back(prestoFrame);
}

void WindowFuzzer::go() {
Expand Down Expand Up @@ -520,10 +535,12 @@ bool WindowFuzzer::verifyWindow(
}
};

core::PlanNodeId windowNodeId;
auto frame = getFrame(partitionKeys, sortingKeysAndOrders, frameClause);
auto plan = PlanBuilder()
.values(input)
.window({fmt::format("{} over ({})", functionCall, frame)})
.capturePlanNodeId(windowNodeId)
.planNode();

if (persistAndRunOnce_) {
Expand All @@ -539,6 +556,8 @@ bool WindowFuzzer::verifyWindow(

if (!customVerification) {
if (resultOrError.result && enableWindowVerification) {
referenceQueryRunner_->queryRunnerContext_
->windowFrames_[windowNodeId] = prestoFrames_;
auto referenceResult = computeReferenceResults(plan, input);
stats_.updateReferenceQueryStats(referenceResult.second);
if (auto expectedResult = referenceResult.first) {
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/fuzzer/WindowFuzzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class WindowFuzzer : public AggregationFuzzerBase {
// Add offset column to input data for k-range frames. Returns the value of K
// as a string.
template <typename T>
void addKRangeFrameColumnToInput(
std::optional<std::string> addKRangeFrameColumnToInput(
std::vector<RowVectorPtr>& input,
core::WindowNode::BoundType frameBoundType,
std::string& columnName,
Expand Down Expand Up @@ -165,6 +165,11 @@ class WindowFuzzer : public AggregationFuzzerBase {

void print(size_t numIterations) const;
} stats_;

// Represents the value of frame bound for k-range frames to be used when
// running the query in Presto, when Presto is used as reference DB for
// verification.
std::vector<std::pair<std::string, std::string>> prestoFrames_;
};

/// Runs the window fuzzer.
Expand Down

0 comments on commit 8b23836

Please sign in to comment.