Skip to content

Commit

Permalink
[native] Prepare for the exchange protocol upgrade
Browse files Browse the repository at this point in the history
This is the change needed on presto_cpp side to implement the new exchange
protocol.  Implement `ExchangeSource::getDataSizes` and use it when the header
`X-Presto-Get-Data-Size` exists (will be removed in the future once coordinator
support the `HEAD` calls), or the endpoint is called with HTTP `HEAD` method.

All changes here are backward-compatible.  The new protocol will not be used
until we start calling `getDataSizes` on velox side in `ExchangeClient`.

See #21926 for details about design of
the new protocol.

Also added new runtime stats `numTopOutputBuffers`.
  • Loading branch information
Yuhta authored and mbasmanova committed Feb 27, 2024
1 parent 5c8fc86 commit be66148
Show file tree
Hide file tree
Showing 15 changed files with 12,582 additions and 12,497 deletions.
24 changes: 20 additions & 4 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,17 @@ void PrestoExchangeSource::doRequest(
auto path = fmt::format("{}/{}", basePath_, sequence_);
VLOG(1) << "Fetching data from " << host_ << ":" << port_ << " " << path;
auto self = getSelfPtr();
http::RequestBuilder()
.method(proxygen::HTTPMethod::GET)
.url(path)
auto requestBuilder =
http::RequestBuilder().method(proxygen::HTTPMethod::GET).url(path);

if (maxBytes == 0) {
requestBuilder.header(protocol::PRESTO_GET_DATA_SIZE_HEADER, "true");
// Coordinator ignores the header and always sends back data. There is only
// one coordinator to fetch data from, so a limit of 1MB is enough.
maxBytes = 1 << 20;
}

requestBuilder
.header(
protocol::PRESTO_MAX_SIZE_HTTP_HEADER,
protocol::DataSize(maxBytes, protocol::DataUnit::BYTE).toString())
Expand Down Expand Up @@ -252,6 +260,13 @@ void PrestoExchangeSource::processDataResponse(
<< sequence_;
}

std::vector<int64_t> remainingBytes;
auto remainingBytesString = headers->getHeaders().getSingleOrEmpty(
protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER);
if (!remainingBytesString.empty()) {
folly::split(',', remainingBytesString, remainingBytes);
}

int64_t ackSequence =
atol(headers->getHeaders()
.getSingleOrEmpty(protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER)
Expand Down Expand Up @@ -325,7 +340,8 @@ void PrestoExchangeSource::processDataResponse(
}

if (requestPromise.valid() && !requestPromise.isFulfilled()) {
requestPromise.setValue(Response{pageSize, complete});
requestPromise.setValue(
Response{pageSize, complete, std::move(remainingBytes)});
} else {
// The source must have been closed.
VELOX_CHECK(closed_.load());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override {
return request(0, maxWaitSeconds);
}

// Create an exchange source using pooled connections.
static std::shared_ptr<PrestoExchangeSource> create(
const std::string& url,
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ protocol::TaskInfo PrestoTask::updateInfoLocked() {
taskRuntimeStats.insert(
{"averageOutputBufferWallNanos",
fromMillis(outputBufferStats.averageBufferTimeMs)});
taskRuntimeStats["numTopOutputBuffers"].addValue(
outputBufferStats.numTopBuffers);
}

if (taskStats.memoryReclaimCount > 0) {
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct Result {
int64_t nextSequence;
std::unique_ptr<folly::IOBuf> data;
bool complete;
std::vector<int64_t> remainingBytes;
};

struct ResultRequest {
Expand Down
6 changes: 4 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ void getData(
[taskId = taskId, bufferId = destination, promiseHolder, startMs](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence,
std::vector<int64_t> /*remainingBytes*/) mutable {
bool complete = pages.empty();
std::vector<int64_t> remainingBytes) mutable {
bool complete = false;
int64_t nextSequence = sequence;
std::unique_ptr<folly::IOBuf> iobuf;
int64_t bytes = 0;
Expand All @@ -143,13 +143,15 @@ void getData(
VLOG(1) << "Task " << taskId << ", buffer " << bufferId << ", sequence "
<< sequence << " Results size: " << bytes
<< ", page count: " << pages.size()
<< ", remaining: " << folly::join(',', remainingBytes)
<< ", complete: " << std::boolalpha << complete;

auto result = std::make_unique<Result>();
result->sequence = sequence;
result->nextSequence = nextSequence;
result->complete = complete;
result->data = std::move(iobuf);
result->remainingBytes = std::move(remainingBytes);

promiseHolder->promise.setValue(std::move(result));

Expand Down
40 changes: 26 additions & 14 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,18 @@ void TaskResource::registerUris(http::HttpServer& server) {
return getTaskStatus(message, pathMatch);
});

server.registerGet(
R"(/v1/task/async/(.+)/results/([0-9]+)/([0-9]+))",
server.registerHead(
R"(/v1/task/(.+)/results/([0-9]+)/([0-9]+))",
[&](proxygen::HTTPMessage* message,
const std::vector<std::string>& pathMatch) {
return getResults(message, pathMatch);
return getResults(message, pathMatch, true);
});

server.registerGet(
R"(/v1/task/(.+)/results/([0-9]+)/([0-9]+))",
[&](proxygen::HTTPMessage* message,
const std::vector<std::string>& pathMatch) {
return getResults(message, pathMatch);
return getResults(message, pathMatch, false);
});

server.registerGet(
Expand Down Expand Up @@ -405,18 +405,25 @@ proxygen::RequestHandler* TaskResource::deleteTask(

proxygen::RequestHandler* TaskResource::getResults(
proxygen::HTTPMessage* message,
const std::vector<std::string>& pathMatch) {
const std::vector<std::string>& pathMatch,
bool getDataSize) {
protocol::TaskId taskId = pathMatch[1];
long bufferId = folly::to<long>(pathMatch[2]);
long token = folly::to<long>(pathMatch[3]);

auto& headers = message->getHeaders();
auto maxSize = protocol::DataSize(
headers.exists(protocol::PRESTO_MAX_SIZE_HTTP_HEADER)
? headers.getSingleOrEmpty(protocol::PRESTO_MAX_SIZE_HTTP_HEADER)
: protocol::PRESTO_MAX_SIZE_DEFAULT);
auto maxWait = getMaxWait(message).value_or(
protocol::Duration(protocol::PRESTO_MAX_WAIT_DEFAULT));
protocol::DataSize maxSize;
if (getDataSize || headers.exists(protocol::PRESTO_GET_DATA_SIZE_HEADER)) {
maxSize = protocol::DataSize(0, protocol::DataUnit::BYTE);
} else {
maxSize = protocol::DataSize(
headers.exists(protocol::PRESTO_MAX_SIZE_HTTP_HEADER)
? headers.getSingleOrEmpty(protocol::PRESTO_MAX_SIZE_HTTP_HEADER)
: protocol::PRESTO_MAX_SIZE_DEFAULT);
}

return new http::CallbackRequestHandler(
[this, taskId, bufferId, token, maxSize, maxWait](
proxygen::HTTPMessage* /*message*/,
Expand Down Expand Up @@ -447,8 +454,9 @@ proxygen::RequestHandler* TaskResource::getResults(
auto status = result->data && result->data->length() == 0
? http::kHttpNoContent
: http::kHttpOk;
proxygen::ResponseBuilder(downstream)
.status(status, "")

proxygen::ResponseBuilder builder(downstream);
builder.status(status, "")
.header(
proxygen::HTTP_HEADER_CONTENT_TYPE,
protocol::PRESTO_PAGES_MIME_TYPE)
Expand All @@ -462,9 +470,13 @@ proxygen::RequestHandler* TaskResource::getResults(
std::to_string(result->nextSequence))
.header(
protocol::PRESTO_BUFFER_COMPLETE_HEADER,
result->complete ? "true" : "false")
.body(std::move(result->data))
.sendWithEOM();
result->complete ? "true" : "false");
if (!result->remainingBytes.empty()) {
builder.header(
protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER,
folly::join(',', result->remainingBytes));
}
builder.body(std::move(result->data)).sendWithEOM();
})
.thenError(
folly::tag_t<velox::VeloxException>{},
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class TaskResource {

proxygen::RequestHandler* getResults(
proxygen::HTTPMessage* message,
const std::vector<std::string>& pathMatch);
const std::vector<std::string>& pathMatch,
bool getDataSize);

proxygen::RequestHandler* getTaskStatus(
proxygen::HTTPMessage* message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
return reader_->stats();
}

folly::SemiFuture<BroadcastExchangeSource::Response>
BroadcastExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
//
// TODO: Change BroadcastFileReader to return the next batch size.
remainingBytes.push_back(1 << 20);
}
return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)});
}

// static
std::shared_ptr<exec::ExchangeSource>
BroadcastExchangeSource::createExchangeSource(
Expand Down Expand Up @@ -103,4 +115,4 @@ BroadcastExchangeSource::createExchangeSource(
fileSystemBroadcast.createReader(std::move(broadcastFileInfo), pool),
pool);
}
}; // namespace facebook::presto::operators
}; // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;

void close() override {}

folly::F14FastMap<std::string, int64_t> stats() const override;
Expand All @@ -56,4 +59,4 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource {
private:
const std::shared_ptr<BroadcastFileReader> reader_;
};
} // namespace facebook::presto::operators
} // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ UnsafeRowExchangeSource::request(
CALL_SHUFFLE(return nextBatch(), "next");
}

folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
//
// TODO: Change ShuffleReader to return the next batch size.
remainingBytes.push_back(1 << 20);
}
return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)});
}

folly::F14FastMap<std::string, int64_t> UnsafeRowExchangeSource::stats() const {
return shuffle_->stats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;

void close() override {
shuffle_->noMoreData(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const char* const PRESTO_TASK_INSTANCE_ID_HEADER = "X-Presto-Task-Instance-Id";
const char* const PRESTO_PAGE_TOKEN_HEADER = "X-Presto-Page-Sequence-Id";
const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER = "X-Presto-Page-End-Sequence-Id";
const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete";
const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size";
const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER = "X-Presto-Buffer-Remaining-Bytes";
const char* const PRESTO_MAX_WAIT_DEFAULT = "2s";
const char* const PRESTO_MAX_SIZE_DEFAULT = "4096 B";
Expand Down Expand Up @@ -94,14 +96,14 @@ namespace facebook::presto::protocol {
//Loosly copied this here from NLOHMANN_JSON_SERIALIZE_ENUM()
// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
static const std::pair<{{&class_name}}, json>
static const std::pair<{{&class_name}}, json>
{{&class_name}}_enum_table[] = { // NOLINT: cert-err58-cpp
{{#elements}}
{ {{&class_name}}::{{&element}}, "{{&element}}" }{{^_last}},{{/_last}}
{{/elements}}
};
void to_json(json& j, const {{&class_name}}& e)
{
void to_json(json& j, const {{&class_name}}& e)
{
static_assert(std::is_enum<{{&class_name}}>::value, "{{&class_name}} must be an enum!");
const auto* it = std::find_if(std::begin({{&class_name}}_enum_table), std::end({{&class_name}}_enum_table),
[e](const std::pair<{{&class_name}}, json>& ej_pair) -> bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ extern const char* const PRESTO_TASK_INSTANCE_ID_HEADER;
extern const char* const PRESTO_PAGE_TOKEN_HEADER;
extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER;
extern const char* const PRESTO_BUFFER_COMPLETE_HEADER;
extern const char* const PRESTO_GET_DATA_SIZE_HEADER;
extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER;
extern const char* const PRESTO_MAX_WAIT_DEFAULT;
extern const char* const PRESTO_MAX_SIZE_DEFAULT;
Expand Down
Loading

0 comments on commit be66148

Please sign in to comment.