Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Change shuffle maxWait unit #22102

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ bool PrestoExchangeSource::shouldRequestLocked() {

folly::SemiFuture<PrestoExchangeSource::Response> PrestoExchangeSource::request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) {
std::chrono::microseconds maxWait) {
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
// Before calling 'request', the caller should have called
// 'shouldRequestLocked' and received 'true' response. Hence, we expect
// requestPending_ == true, atEnd_ == false.
Expand All @@ -158,15 +158,15 @@ folly::SemiFuture<PrestoExchangeSource::Response> PrestoExchangeSource::request(
RetryState(std::chrono::duration_cast<std::chrono::milliseconds>(
SystemConfig::instance()->exchangeMaxErrorDuration())
.count());
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait);

return future;
}

void PrestoExchangeSource::doRequest(
int64_t delayMs,
uint32_t maxBytes,
uint32_t maxWaitSeconds) {
std::chrono::microseconds maxWait) {
if (closed_.load()) {
queue_->setError("PrestoExchangeSource closed");
return;
Expand All @@ -191,11 +191,11 @@ void PrestoExchangeSource::doRequest(
protocol::DataSize(maxBytes, protocol::DataUnit::BYTE).toString())
.header(
protocol::PRESTO_MAX_WAIT_HTTP_HEADER,
protocol::Duration(maxWaitSeconds, protocol::TimeUnit::SECONDS)
protocol::Duration(maxWait.count(), protocol::TimeUnit::MICROSECONDS)
.toString())
.send(httpClient_.get(), "", delayMs)
.via(driverExecutor_)
.thenValue([path, maxBytes, maxWaitSeconds, self](
.thenValue([path, maxBytes, maxWait, self](
std::unique_ptr<http::HttpResponse> response) {
velox::common::testutil::TestValue::adjust(
"facebook::presto::PrestoExchangeSource::doRequest", self.get());
Expand All @@ -209,7 +209,7 @@ void PrestoExchangeSource::doRequest(
self->processDataError(
path,
maxBytes,
maxWaitSeconds,
maxWait,
fmt::format(
"Received HTTP {} {} {}",
headers->getStatusCode(),
Expand All @@ -219,16 +219,15 @@ void PrestoExchangeSource::doRequest(
self->immediateBufferTransfer_ ? self->pool_.get()
: nullptr)));
} else if (response->hasError()) {
self->processDataError(
path, maxBytes, maxWaitSeconds, response->error());
self->processDataError(path, maxBytes, maxWait, response->error());
} else {
self->processDataResponse(std::move(response));
}
})
.thenError(
folly::tag_t<std::exception>{},
[path, maxBytes, maxWaitSeconds, self](const std::exception& e) {
self->processDataError(path, maxBytes, maxWaitSeconds, e.what());
[path, maxBytes, maxWait, self](const std::exception& e) {
self->processDataError(path, maxBytes, maxWait, e.what());
});
};

Expand Down Expand Up @@ -363,15 +362,15 @@ void PrestoExchangeSource::processDataResponse(
void PrestoExchangeSource::processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
std::chrono::microseconds maxWait,
const std::string& error) {
++failedAttempts_;
if (!dataRequestRetryState_.isExhausted()) {
VLOG(1) << "Failed to fetch data from " << host_ << ":" << port_ << " "
<< path << ", duration: " << dataRequestRetryState_.durationMs()
<< "ms - Retrying: " << error;

doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait);
return;
}

Expand Down
13 changes: 8 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
/// should not hold a lock over queue's mutex when making this call.
folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override {
return request(0, maxWaitSeconds);
std::chrono::microseconds maxWait) override {
return request(0, maxWait);
}

// Create an exchange source using pooled connections.
Expand Down Expand Up @@ -198,7 +198,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
static void testingClearMemoryUsage();

private:
void doRequest(int64_t delayMs, uint32_t maxBytes, uint32_t maxWaitSeconds);
void doRequest(
int64_t delayMs,
uint32_t maxBytes,
std::chrono::microseconds maxWait);

/// Handles successful, possibly empty, response. Adds received data to the
/// queue. If received an end marker, notifies the queue by adding null page.
Expand All @@ -218,7 +221,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
void processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
std::chrono::microseconds maxWait,
const std::string& error);

void acknowledgeResults(int64_t ackSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ std::optional<std::string> getBroadcastInfo(folly::Uri& uri) {
folly::SemiFuture<BroadcastExchangeSource::Response>
BroadcastExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
std::chrono::microseconds /*maxWait*/) {
if (atEnd_) {
return folly::makeFuture(Response{0, true});
}
Expand Down Expand Up @@ -69,7 +69,8 @@ folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
}

folly::SemiFuture<BroadcastExchangeSource::Response>
BroadcastExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
BroadcastExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource {

folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

void close() override {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace facebook::presto::operators {
folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
return std::move(shuffle_->next())
.deferValue([this](velox::BufferPtr buffer) {
Expand Down Expand Up @@ -74,7 +74,8 @@ UnsafeRowExchangeSource::request(
}

folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
UnsafeRowExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {

folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

void close() override {
shuffle_->noMoreData(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
std::lock_guard<std::mutex> l(queue->mutex());
ASSERT_TRUE(exchangeSource->shouldRequestLocked());
}
exchangeSource->request(1 << 20, 2);
exchangeSource->request(1 << 20, std::chrono::seconds(2));
}

std::shared_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -999,7 +999,7 @@ TEST_P(PrestoExchangeSourceTest, closeRaceCondition) {
std::lock_guard<std::mutex> l(queue->mutex());
ASSERT_TRUE(exchangeSource->shouldRequestLocked());
}
auto future = exchangeSource->request(1 << 20, 2);
auto future = exchangeSource->request(1 << 20, std::chrono::seconds(2));
ASSERT_TRUE(future.isReady());
auto response = std::move(future).get();
ASSERT_EQ(response.bytes, 0);
Expand Down
Loading