Skip to content

Commit

Permalink
[native] Change shuffle maxWait unit
Browse files Browse the repository at this point in the history
We want to use 100ms for data fetch max wait, so need to change the unit here.
All changes are functionally equivalent to previous version.
  • Loading branch information
Yuhta authored and aditi-pandit committed Mar 6, 2024
1 parent d80e49a commit 53d79f8
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 27 deletions.
23 changes: 11 additions & 12 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ bool PrestoExchangeSource::shouldRequestLocked() {

folly::SemiFuture<PrestoExchangeSource::Response> PrestoExchangeSource::request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) {
std::chrono::microseconds maxWait) {
// Before calling 'request', the caller should have called
// 'shouldRequestLocked' and received 'true' response. Hence, we expect
// requestPending_ == true, atEnd_ == false.
Expand All @@ -158,15 +158,15 @@ folly::SemiFuture<PrestoExchangeSource::Response> PrestoExchangeSource::request(
RetryState(std::chrono::duration_cast<std::chrono::milliseconds>(
SystemConfig::instance()->exchangeMaxErrorDuration())
.count());
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait);

return future;
}

void PrestoExchangeSource::doRequest(
int64_t delayMs,
uint32_t maxBytes,
uint32_t maxWaitSeconds) {
std::chrono::microseconds maxWait) {
if (closed_.load()) {
queue_->setError("PrestoExchangeSource closed");
return;
Expand All @@ -191,11 +191,11 @@ void PrestoExchangeSource::doRequest(
protocol::DataSize(maxBytes, protocol::DataUnit::BYTE).toString())
.header(
protocol::PRESTO_MAX_WAIT_HTTP_HEADER,
protocol::Duration(maxWaitSeconds, protocol::TimeUnit::SECONDS)
protocol::Duration(maxWait.count(), protocol::TimeUnit::MICROSECONDS)
.toString())
.send(httpClient_.get(), "", delayMs)
.via(driverExecutor_)
.thenValue([path, maxBytes, maxWaitSeconds, self](
.thenValue([path, maxBytes, maxWait, self](
std::unique_ptr<http::HttpResponse> response) {
velox::common::testutil::TestValue::adjust(
"facebook::presto::PrestoExchangeSource::doRequest", self.get());
Expand All @@ -209,7 +209,7 @@ void PrestoExchangeSource::doRequest(
self->processDataError(
path,
maxBytes,
maxWaitSeconds,
maxWait,
fmt::format(
"Received HTTP {} {} {}",
headers->getStatusCode(),
Expand All @@ -219,16 +219,15 @@ void PrestoExchangeSource::doRequest(
self->immediateBufferTransfer_ ? self->pool_.get()
: nullptr)));
} else if (response->hasError()) {
self->processDataError(
path, maxBytes, maxWaitSeconds, response->error());
self->processDataError(path, maxBytes, maxWait, response->error());
} else {
self->processDataResponse(std::move(response));
}
})
.thenError(
folly::tag_t<std::exception>{},
[path, maxBytes, maxWaitSeconds, self](const std::exception& e) {
self->processDataError(path, maxBytes, maxWaitSeconds, e.what());
[path, maxBytes, maxWait, self](const std::exception& e) {
self->processDataError(path, maxBytes, maxWait, e.what());
});
};

Expand Down Expand Up @@ -363,15 +362,15 @@ void PrestoExchangeSource::processDataResponse(
void PrestoExchangeSource::processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
std::chrono::microseconds maxWait,
const std::string& error) {
++failedAttempts_;
if (!dataRequestRetryState_.isExhausted()) {
VLOG(1) << "Failed to fetch data from " << host_ << ":" << port_ << " "
<< path << ", duration: " << dataRequestRetryState_.durationMs()
<< "ms - Retrying: " << error;

doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait);
return;
}

Expand Down
13 changes: 8 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
/// should not hold a lock over queue's mutex when making this call.
folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override {
return request(0, maxWaitSeconds);
std::chrono::microseconds maxWait) override {
return request(0, maxWait);
}

// Create an exchange source using pooled connections.
Expand Down Expand Up @@ -198,7 +198,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
static void testingClearMemoryUsage();

private:
void doRequest(int64_t delayMs, uint32_t maxBytes, uint32_t maxWaitSeconds);
void doRequest(
int64_t delayMs,
uint32_t maxBytes,
std::chrono::microseconds maxWait);

/// Handles successful, possibly empty, response. Adds received data to the
/// queue. If received an end marker, notifies the queue by adding null page.
Expand All @@ -218,7 +221,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
void processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
std::chrono::microseconds maxWait,
const std::string& error);

void acknowledgeResults(int64_t ackSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ std::optional<std::string> getBroadcastInfo(folly::Uri& uri) {
folly::SemiFuture<BroadcastExchangeSource::Response>
BroadcastExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
std::chrono::microseconds /*maxWait*/) {
if (atEnd_) {
return folly::makeFuture(Response{0, true});
}
Expand Down Expand Up @@ -69,7 +69,8 @@ folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
}

folly::SemiFuture<BroadcastExchangeSource::Response>
BroadcastExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
BroadcastExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource {

folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

void close() override {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace facebook::presto::operators {
folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
return std::move(shuffle_->next())
.deferValue([this](velox::BufferPtr buffer) {
Expand Down Expand Up @@ -74,7 +74,8 @@ UnsafeRowExchangeSource::request(
}

folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) {
UnsafeRowExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {

folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override;
std::chrono::microseconds maxWait) override;

void close() override {
shuffle_->noMoreData(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
std::lock_guard<std::mutex> l(queue->mutex());
ASSERT_TRUE(exchangeSource->shouldRequestLocked());
}
exchangeSource->request(1 << 20, 2);
exchangeSource->request(1 << 20, std::chrono::seconds(2));
}

std::shared_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -999,7 +999,7 @@ TEST_P(PrestoExchangeSourceTest, closeRaceCondition) {
std::lock_guard<std::mutex> l(queue->mutex());
ASSERT_TRUE(exchangeSource->shouldRequestLocked());
}
auto future = exchangeSource->request(1 << 20, 2);
auto future = exchangeSource->request(1 << 20, std::chrono::seconds(2));
ASSERT_TRUE(future.isReady());
auto response = std::move(future).get();
ASSERT_EQ(response.bytes, 0);
Expand Down

0 comments on commit 53d79f8

Please sign in to comment.