Skip to content

Commit

Permalink
Add query runner context and kRange frames
Browse files Browse the repository at this point in the history
Co-authored-by: Minhan Cao <mcao@ibm.com>
  • Loading branch information
pramodsatya and minhancao committed Jul 16, 2024
1 parent 3598307 commit 2779a99
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 175 deletions.
2 changes: 1 addition & 1 deletion velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ void AggregationFuzzer::go() {

auto partitionKeys = generateKeys("p", argNames, argTypes);
auto sortingKeys = generateSortingKeys("s", argNames, argTypes);
auto input = generateInputDataWithRowNumber(
auto input = generateInputDataForWindowFuzzer(
argNames, argTypes, partitionKeys, signature);

logVectors(input);
Expand Down
58 changes: 46 additions & 12 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,41 @@ std::vector<std::string> AggregationFuzzerBase::generateKeys(
std::vector<std::string> AggregationFuzzerBase::generateSortingKeys(
const std::string& prefix,
std::vector<std::string>& names,
std::vector<TypePtr>& types) {
std::vector<TypePtr>& types,
const bool hasRowNumberKey,
const bool rangeFrame) {
std::vector<std::string> keys;
auto numKeys = boost::random::uniform_int_distribution<uint32_t>(1, 5)(rng_);
vector_size_t numKeys;
vector_size_t maxDepth;
std::vector<TypePtr> sortingKeyTypes = kScalarTypes;

// If frame has kRange bound, only one sorting key should be present. If the
// row_number column is not present, generate this sorting key randomly; use
// the row_number column as sorting key otherwise.
if (rangeFrame) {
if (hasRowNumberKey) {
return keys;
} else {
numKeys = 1;
// Pick scalar type which supports '+', '-' binary operations.
sortingKeyTypes = {
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
HUGEINT(),
REAL(),
DOUBLE()};
maxDepth = 0;
}
} else {
numKeys = randInt(1, 5);
maxDepth = 2;
}

for (auto i = 0; i < numKeys; ++i) {
keys.push_back(fmt::format("{}{}", prefix, i));

// Pick random, possibly complex, type.
types.push_back(vectorFuzzer_.randOrderableType(2));
types.push_back(vectorFuzzer_.randOrderableType(maxDepth, sortingKeyTypes));
names.push_back(keys.back());
}

Expand Down Expand Up @@ -296,13 +323,17 @@ std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputData(
return input;
}

std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputDataWithRowNumber(
std::vector<RowVectorPtr>
AggregationFuzzerBase::generateInputDataForWindowFuzzer(
std::vector<std::string> names,
std::vector<TypePtr> types,
const std::vector<std::string>& partitionKeys,
const CallableSignature& signature) {
names.push_back("row_number");
types.push_back(BIGINT());
const CallableSignature& signature,
const bool hasRowNumberKey) {
if (hasRowNumberKey) {
names.push_back("row_number");
types.push_back(BIGINT());
}

auto generator = findInputGenerator(signature);

Expand All @@ -329,7 +360,8 @@ std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputDataWithRowNumber(
auto numPartitions = size ? randInt(1, size) : 1;
auto indices = vectorFuzzer_.fuzzIndices(size, numPartitions);
auto nulls = vectorFuzzer_.fuzzNulls(size);
for (auto i = children.size(); i < types.size() - 1; ++i) {
auto n = hasRowNumberKey ? types.size() - 1 : types.size();
for (auto i = children.size(); i < n; ++i) {
if (partitionKeySet.find(names[i]) != partitionKeySet.end()) {
// The partition keys are built with a dictionary over a smaller set of
// values. This is done to introduce some repetition of key values for
Expand All @@ -341,8 +373,10 @@ std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputDataWithRowNumber(
children.push_back(vectorFuzzer_.fuzz(types[i], size));
}
}
children.push_back(vectorMaker.flatVector<int64_t>(
size, [&](auto /*row*/) { return rowNumber++; }));
if (hasRowNumberKey) {
children.push_back(vectorMaker.flatVector<int64_t>(
size, [&](auto /*row*/) { return rowNumber++; }));
}
input.push_back(vectorMaker.rowVector(names, children));
}

Expand Down
16 changes: 12 additions & 4 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,18 @@ class AggregationFuzzerBase {
std::vector<TypePtr>& types);

// Similar to generateKeys, but restricts types to orderable types (i.e. no
// maps).
// maps). For range frames with k preceding/following frame bounds:
// 1. hasRowNumberKey indicates whether the row_number column should be used
// as the sorting key. The row_number key is added for consistent result
// verification when the function is order dependent or when the frame is
// of ROWS type.
// 2. rangeFrame must be set to true.
std::vector<std::string> generateSortingKeys(
const std::string& prefix,
std::vector<std::string>& names,
std::vector<TypePtr>& types);
std::vector<TypePtr>& types,
const bool hasRowNumberKey = true,
const bool rangeFrame = false);

std::pair<CallableSignature, SignatureStats&> pickSignature();

Expand All @@ -202,11 +209,12 @@ class AggregationFuzzerBase {
// child named "row_number" of BIGINT row numbers that differentiates every
// row. Row numbers start from 0. This additional input vector is needed for
// result verification of window aggregations.
std::vector<RowVectorPtr> generateInputDataWithRowNumber(
std::vector<RowVectorPtr> generateInputDataForWindowFuzzer(
std::vector<std::string> names,
std::vector<TypePtr> types,
const std::vector<std::string>& partitionKeys,
const CallableSignature& signature);
const CallableSignature& signature,
const bool hasRowNumberKey = true);

std::pair<std::optional<MaterializedRowMultiset>, ReferenceQueryErrorCode>
computeReferenceResults(
Expand Down
55 changes: 4 additions & 51 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ PrestoQueryRunner::PrestoQueryRunner(
user_{std::move(user)},
timeout_(timeout) {
eventBaseThread_.start("PrestoQueryRunner");
queryRunnerContext_ = std::make_shared<QueryRunnerContext>();
}

std::optional<std::string> PrestoQueryRunner::toSql(
Expand Down Expand Up @@ -431,56 +432,6 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return sql.str();
}

namespace {

void appendWindowFrame(
const core::WindowNode::Frame& frame,
std::stringstream& sql) {
// TODO: Add support for k Range Frames by retrieving the original range bound
// from WindowNode.
switch (frame.type) {
case core::WindowNode::WindowType::kRange:
sql << " RANGE";
break;
case core::WindowNode::WindowType::kRows:
sql << " ROWS";
break;
default:
VELOX_UNREACHABLE();
}
sql << " BETWEEN";

auto appendBound = [&sql](
const core::WindowNode::BoundType& bound,
const core::TypedExprPtr& value) {
switch (bound) {
case core::WindowNode::BoundType::kUnboundedPreceding:
sql << " UNBOUNDED PRECEDING";
break;
case core::WindowNode::BoundType::kUnboundedFollowing:
sql << " UNBOUNDED FOLLOWING";
break;
case core::WindowNode::BoundType::kCurrentRow:
sql << " CURRENT ROW";
break;
case core::WindowNode::BoundType::kPreceding:
sql << " " << value->toString() << " PRECEDING";
break;
case core::WindowNode::BoundType::kFollowing:
sql << " " << value->toString() << " FOLLOWING";
break;
default:
VELOX_UNREACHABLE();
}
};

appendBound(frame.startType, frame.startValue);
sql << " AND";
appendBound(frame.endType, frame.endValue);
}

} // namespace

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::WindowNode>& windowNode) {
if (!isSupportedDwrfType(windowNode->sources()[0]->outputType())) {
Expand Down Expand Up @@ -525,7 +476,9 @@ std::optional<std::string> PrestoQueryRunner::toSql(
}
}

appendWindowFrame(functions[i].frame, sql);
auto frameClause =
queryRunnerContext_->windowFrames_.at(windowNode->id()).back();
sql << frameClause;
sql << ")";
}

Expand Down
8 changes: 7 additions & 1 deletion velox/exec/fuzzer/ReferenceQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

namespace facebook::velox::exec::test {

class QueryRunnerContext {
public:
std::unordered_map<core::PlanNodeId, std::vector<std::string>> windowFrames_;
};

/// Query runner that uses reference database, i.e. DuckDB, Presto, Spark.
class ReferenceQueryRunner {
public:
Expand Down Expand Up @@ -78,6 +83,7 @@ class ReferenceQueryRunner {
const std::string& sessionProperty) {
VELOX_UNSUPPORTED();
}
};

std::shared_ptr<QueryRunnerContext> queryRunnerContext_;
};
} // namespace facebook::velox::exec::test
Loading

0 comments on commit 2779a99

Please sign in to comment.