Skip to content

Commit

Permalink
Add session property for sorted table write fuzzer (#10299)
Browse files Browse the repository at this point in the history
Summary:
When reading back the data from each split of a sorted table from presto,
set session property task.concurrency to 1 to force returning sorted result.

Pull Request resolved: #10299

Reviewed By: xiaoxmeng

Differential Revision: D58990171

Pulled By: kewang1024

fbshipit-source-id: 7da616ddefd83a4ac16994ec12f3c3a4edea493b
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Jun 25, 2024
1 parent 4ed96b5 commit 28238cd
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
24 changes: 16 additions & 8 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,14 @@ std::vector<velox::RowVectorPtr> PrestoQueryRunner::executeVector(
}

std::vector<RowVectorPtr> PrestoQueryRunner::execute(const std::string& sql) {
return execute(sql, "");
}

std::vector<RowVectorPtr> PrestoQueryRunner::execute(
const std::string& sql,
const std::string& sessionProperty) {
LOG(INFO) << "Execute presto sql: " << sql;
auto response = ServerResponse(startQuery(sql));
auto response = ServerResponse(startQuery(sql, sessionProperty));
response.throwIfFailed();

std::vector<RowVectorPtr> queryResults;
Expand All @@ -888,16 +894,18 @@ std::vector<RowVectorPtr> PrestoQueryRunner::execute(const std::string& sql) {
return queryResults;
}

std::string PrestoQueryRunner::startQuery(const std::string& sql) {
std::string PrestoQueryRunner::startQuery(
const std::string& sql,
const std::string& sessionProperty) {
auto uri = fmt::format("{}/v1/statement?binaryResults=true", coordinatorUri_);
cpr::Url url{uri};
cpr::Body body{sql};
cpr::Header header({
{"X-Presto-User", user_},
{"X-Presto-Catalog", "hive"},
{"X-Presto-Schema", "tpch"},
{"Content-Type", "text/plain"},
});
cpr::Header header(
{{"X-Presto-User", user_},
{"X-Presto-Catalog", "hive"},
{"X-Presto-Schema", "tpch"},
{"Content-Type", "text/plain"},
{"X-Presto-Session", sessionProperty}});
cpr::Timeout timeout{timeout_};
cpr::Response response = cpr::Post(url, body, header, timeout);
VELOX_CHECK_EQ(
Expand Down
9 changes: 8 additions & 1 deletion velox/exec/fuzzer/PrestoQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {
/// the query must already exist.
std::vector<velox::RowVectorPtr> execute(const std::string& sql) override;

/// Executes Presto SQL query with extra presto session property.
std::vector<velox::RowVectorPtr> execute(
const std::string& sql,
const std::string& sessionProperty) override;

bool supportsVeloxVectorResults() const override;

std::vector<RowVectorPtr> executeVector(
Expand Down Expand Up @@ -117,7 +122,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {
std::optional<std::string> toSql(
const std::shared_ptr<const core::NestedLoopJoinNode>& joinNode);

std::string startQuery(const std::string& sql);
std::string startQuery(
const std::string& sql,
const std::string& sessionProperty = "");

std::string fetchNext(const std::string& nextUri);

Expand Down
6 changes: 6 additions & 0 deletions velox/exec/fuzzer/ReferenceQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class ReferenceQueryRunner {
virtual std::vector<velox::RowVectorPtr> execute(const std::string& sql) {
VELOX_UNSUPPORTED();
}

virtual std::vector<velox::RowVectorPtr> execute(
const std::string& sql,
const std::string& sessionProperty) {
VELOX_UNSUPPORTED();
}
};

} // namespace facebook::velox::exec::test
6 changes: 3 additions & 3 deletions velox/exec/fuzzer/WriterFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ void WriterFuzzer::verifyWriter(
partitionKeys,
sortBy);

const auto referenceResult =
referenceQueryRunner_->execute(singleSplitReferenceSql);
const auto referenceResult = referenceQueryRunner_->execute(
singleSplitReferenceSql, "task_concurrency=1");
const auto& referenceData = referenceResult.at(0);
for (int i = 1; i < referenceResult.size(); ++i) {
referenceData->append(referenceResult.at(i).get());
Expand Down Expand Up @@ -738,7 +738,7 @@ std::string WriterFuzzer::partitionToSql(
const TypePtr& type,
std::string partitionValue) {
if (type->isVarchar()) {
RE2::Replace(&partitionValue, "'", "''");
RE2::GlobalReplace(&partitionValue, "'", "''");
return "'" + partitionValue + "'";
}
return partitionValue;
Expand Down

0 comments on commit 28238cd

Please sign in to comment.