Skip to content

Commit

Permalink
Add native endpoint for Velox plan conversion
Browse files Browse the repository at this point in the history
This adds an endpoint to the native Presto server that will convert
a Presto plan fragment to Velox. If the conversion is successful,
the server will send an ok response. If it fails, the server will
send an error response with a 422 status code as unprocessable.
The error message will contain an ExecutionFailureInfo struct
with error type, code and message.

See also #23649
RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
  • Loading branch information
BryanCutler committed Nov 12, 2024
1 parent 3cc5ecb commit 2090c3b
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 31 deletions.
64 changes: 64 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Exception.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -1454,6 +1455,14 @@ void PrestoServer::registerSidecarEndpoints() {
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
});
httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
server->convertToVeloxPlan(message, downstream, body);
});
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
Expand Down Expand Up @@ -1484,4 +1493,59 @@ protocol::NodeStatus PrestoServer::fetchNodeStatus() {
return nodeStatus;
}

void PrestoServer::convertToVeloxPlan(
proxygen::HTTPMessage* message,
proxygen::ResponseHandler* downstream,
const std::vector<std::unique_ptr<folly::IOBuf>>& body) {
protocol::ExecutionFailureInfo failure;
try {
auto headers = message->getHeaders();

std::string planFragmentJson = util::extractMessageBody(body);
protocol::PlanFragment planFragment = json::parse(planFragmentJson);

auto queryCtx = core::QueryCtx::create();
VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_.get());

// Create static taskId and empty TableWriteInfo needed for plan conversion
protocol::TaskId taskId = "velox-plan-conversion.0.0.0";
auto tableWriteInfo = std::make_shared<protocol::TableWriteInfo>();

// Attempt to convert the plan fragment to a Velox plan
if (auto writeNode =
std::dynamic_pointer_cast<const protocol::TableWriterNode>(
planFragment.root)) {
// TableWriteInfo is not yet built at the planning stage, so we can not
// fully convert a TableWriteNode and skip that node of the fragment.
auto writeSourceNode =
converter.toVeloxQueryPlan(writeNode->source, tableWriteInfo, taskId);
getVeloxPlanValidator()->validatePlanFragment(
core::PlanFragment(writeSourceNode));
} else {
auto veloxPlan =
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);
getVeloxPlanValidator()->validatePlanFragment(veloxPlan);
}
} catch (const VeloxException& e) {
failure = VeloxToPrestoExceptionTranslator::translate(e);
} catch (const std::exception& e) {
failure = VeloxToPrestoExceptionTranslator::translate(e);
}

// Return ok status if conversion succeeded or error if failed
protocol::PlanConversionResponse response;
if (failure.errorCode.code == 0) {
http::sendOkResponse(downstream, json(response));
} else {
protocol::PlanConversionFailureInfo planConversionFailureInfo;
planConversionFailureInfo.type = failure.type;
planConversionFailureInfo.message = failure.message;
planConversionFailureInfo.stack = failure.stack;
planConversionFailureInfo.errorCode = failure.errorCode;
response.failures.emplace_back(planConversionFailureInfo);
http::sendErrorJsonResponse(
downstream, json(response), http::kHttpUnprocessableContent);
}
}

} // namespace facebook::presto
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ class PrestoServer {

void registerSidecarEndpoints();

void convertToVeloxPlan(
proxygen::HTTPMessage* message,
proxygen::ResponseHandler* downstream,
const std::vector<std::unique_ptr<folly::IOBuf>>& body);

std::unique_ptr<velox::cache::SsdCache> setupSsdCache();

const std::string configDirectoryPath_;
Expand Down
8 changes: 1 addition & 7 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
httpSrvCpuExecutor_,
[this, &body, taskId, createOrUpdateFunc]() {
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();

// TODO Avoid copy
std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
std::string updateJson = oss.str();
std::string updateJson = util::extractMessageBody(body);

std::unique_ptr<protocol::TaskInfo> taskInfo;
try {
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ void installSignalHandler() {
#endif // __APPLE__
}

std::string extractMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body) {
// TODO Avoid copy
std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
return oss.str();
}
} // namespace facebook::presto::util
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ long getProcessCpuTimeNs();
/// context such as the queryId.
void installSignalHandler();

std::string extractMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body);

} // namespace facebook::presto::util
7 changes: 2 additions & 5 deletions presto-native-execution/presto_cpp/main/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/synchronization/Latch.h>
#include <velox/common/base/Exceptions.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpClient.h"

namespace facebook::presto::http {
Expand Down Expand Up @@ -169,11 +170,7 @@ HttpResponse::nextAllocationSize(uint64_t dataLength) const {
std::string HttpResponse::dumpBodyChain() const {
std::string responseBody;
if (!bodyChain_.empty()) {
std::ostringstream oss;
for (const auto& buf : bodyChain_) {
oss << std::string((const char*)buf->data(), buf->length());
}
responseBody = oss.str();
responseBody = util::extractMessageBody(bodyChain_);
}
return responseBody;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const uint16_t kHttpAccepted = 202;
const uint16_t kHttpNoContent = 204;
const uint16_t kHttpUnauthorized = 401;
const uint16_t kHttpNotFound = 404;
const uint16_t kHttpUnprocessableContent = 422;
const uint16_t kHttpInternalServerError = 500;

const char kMimeTypeApplicationJson[] = "application/json";
Expand Down
25 changes: 19 additions & 6 deletions presto-native-execution/presto_cpp/main/http/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@

namespace facebook::presto::http {

void sendOkResponse(proxygen::ResponseHandler* downstream) {
proxygen::ResponseBuilder(downstream).status(http::kHttpOk, "").sendWithEOM();
}

void sendOkResponse(proxygen::ResponseHandler* downstream, const json& body) {
std::string createJsonBody(const json& body) {
// nlohmann::json throws when it finds invalid UTF-8 characters. In that case
// the server will crash. We handle such situation here and generate body
// replacing the faulty UTF-8 sequences.
Expand All @@ -39,8 +35,15 @@ void sendOkResponse(proxygen::ResponseHandler* downstream, const json& body) {
"Json Dump:\n"
<< messageBody;
}
return messageBody;
}

void sendOkResponse(proxygen::ResponseHandler* downstream) {
proxygen::ResponseBuilder(downstream).status(http::kHttpOk, "").sendWithEOM();
}

sendOkResponse(downstream, messageBody);
void sendOkResponse(proxygen::ResponseHandler* downstream, const json& body) {
sendOkResponse(downstream, createJsonBody(body));
}

void sendOkResponse(
Expand Down Expand Up @@ -75,6 +78,16 @@ void sendErrorResponse(
.sendWithEOM();
}

void sendErrorJsonResponse(
proxygen::ResponseHandler* downstream,
const json& body,
uint16_t status) {
proxygen::ResponseBuilder(downstream)
.status(status, "")
.body(createJsonBody(body))
.sendWithEOM();
}

HttpConfig::HttpConfig(const folly::SocketAddress& address, bool reusePort)
: address_(address), reusePort_(reusePort) {}

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/http/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ void sendErrorResponse(
const std::string& error = "",
uint16_t status = http::kHttpInternalServerError);

void sendErrorJsonResponse(
proxygen::ResponseHandler* downstream,
const json& body,
uint16_t status = http::kHttpInternalServerError);

class AbstractRequestHandler : public proxygen::RequestHandler {
public:
void onRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <velox/common/base/tests/GTestUtils.h>
#include <velox/common/memory/Memory.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpClient.h"
#include "presto_cpp/main/http/HttpServer.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -157,14 +158,6 @@ std::string bodyAsString(
return oss.str();
}

std::string toString(std::vector<std::unique_ptr<folly::IOBuf>>& bufs) {
std::ostringstream oss;
for (auto& buf : bufs) {
oss << std::string((const char*)buf->data(), buf->length());
}
return oss.str();
}

void echo(
proxygen::HTTPMessage* message,
std::vector<std::unique_ptr<folly::IOBuf>>& body,
Expand All @@ -181,7 +174,7 @@ void echo(
proxygen::ResponseBuilder(downstream)
.status(facebook::presto::http::kHttpOk, "")
.header(proxygen::HTTP_HEADER_CONTENT_TYPE, "text/plain")
.body(toString(body))
.body(facebook::presto::util::extractMessageBody(body))
.sendWithEOM();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,9 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
case protocol::SystemPartitioning::FIXED: {
switch (systemPartitioningHandle->function) {
case protocol::SystemPartitionFunction::ROUND_ROBIN: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition
? partitioningScheme.bucketToPartition->size()
: 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand All @@ -1870,7 +1872,9 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
return planFragment;
}
case protocol::SystemPartitionFunction::HASH: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition
? partitioningScheme.bucketToPartition->size()
: 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7351,6 +7351,111 @@ void from_json(const json& j, PipelineStats& p) {
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {

void to_json(json& j, const PlanConversionFailureInfo& p) {
j = json::object();
to_json_key(j, "type", p.type, "PlanConversionFailureInfo", "String", "type");
to_json_key(
j,
"message",
p.message,
"PlanConversionFailureInfo",
"String",
"message");
to_json_key(
j,
"cause",
p.cause,
"PlanConversionFailureInfo",
"PlanConversionFailureInfo",
"cause");
to_json_key(
j,
"suppressed",
p.suppressed,
"PlanConversionFailureInfo",
"List<PlanConversionFailureInfo>",
"suppressed");
to_json_key(
j,
"stack",
p.stack,
"PlanConversionFailureInfo",
"List<String>",
"stack");
to_json_key(
j,
"errorCode",
p.errorCode,
"PlanConversionFailureInfo",
"ErrorCode",
"errorCode");
}

void from_json(const json& j, PlanConversionFailureInfo& p) {
from_json_key(
j, "type", p.type, "PlanConversionFailureInfo", "String", "type");
from_json_key(
j,
"message",
p.message,
"PlanConversionFailureInfo",
"String",
"message");
from_json_key(
j,
"cause",
p.cause,
"PlanConversionFailureInfo",
"PlanConversionFailureInfo",
"cause");
from_json_key(
j,
"suppressed",
p.suppressed,
"PlanConversionFailureInfo",
"List<PlanConversionFailureInfo>",
"suppressed");
from_json_key(
j,
"stack",
p.stack,
"PlanConversionFailureInfo",
"List<String>",
"stack");
from_json_key(
j,
"errorCode",
p.errorCode,
"PlanConversionFailureInfo",
"ErrorCode",
"errorCode");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {

void to_json(json& j, const PlanConversionResponse& p) {
j = json::object();
to_json_key(
j,
"failures",
p.failures,
"PlanConversionResponse",
"List<PlanConversionFailureInfo>",
"failures");
}

void from_json(const json& j, PlanConversionResponse& p) {
from_json_key(
j,
"failures",
p.failures,
"PlanConversionResponse",
"List<PlanConversionFailureInfo>",
"failures");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {

void to_json(json& j, const PlanCostEstimate& p) {
j = json::object();
to_json_key(j, "cpuCost", p.cpuCost, "PlanCostEstimate", "double", "cpuCost");
Expand Down
Loading

0 comments on commit 2090c3b

Please sign in to comment.