diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index 509447f7e88b8..30e494a4efb8c 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -132,7 +132,7 @@ bool PrestoExchangeSource::shouldRequestLocked() { folly::SemiFuture PrestoExchangeSource::request( uint32_t maxBytes, - uint32_t maxWaitSeconds) { + std::chrono::microseconds maxWait) { // Before calling 'request', the caller should have called // 'shouldRequestLocked' and received 'true' response. Hence, we expect // requestPending_ == true, atEnd_ == false. @@ -158,7 +158,7 @@ folly::SemiFuture PrestoExchangeSource::request( RetryState(std::chrono::duration_cast( SystemConfig::instance()->exchangeMaxErrorDuration()) .count()); - doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds); + doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait); return future; } @@ -166,7 +166,7 @@ folly::SemiFuture PrestoExchangeSource::request( void PrestoExchangeSource::doRequest( int64_t delayMs, uint32_t maxBytes, - uint32_t maxWaitSeconds) { + std::chrono::microseconds maxWait) { if (closed_.load()) { queue_->setError("PrestoExchangeSource closed"); return; @@ -191,11 +191,11 @@ void PrestoExchangeSource::doRequest( protocol::DataSize(maxBytes, protocol::DataUnit::BYTE).toString()) .header( protocol::PRESTO_MAX_WAIT_HTTP_HEADER, - protocol::Duration(maxWaitSeconds, protocol::TimeUnit::SECONDS) + protocol::Duration(maxWait.count(), protocol::TimeUnit::MICROSECONDS) .toString()) .send(httpClient_.get(), "", delayMs) .via(driverExecutor_) - .thenValue([path, maxBytes, maxWaitSeconds, self]( + .thenValue([path, maxBytes, maxWait, self]( std::unique_ptr response) { velox::common::testutil::TestValue::adjust( "facebook::presto::PrestoExchangeSource::doRequest", self.get()); @@ -209,7 +209,7 @@ void PrestoExchangeSource::doRequest( self->processDataError( path, maxBytes, - maxWaitSeconds, + maxWait, fmt::format( "Received HTTP {} {} {}", headers->getStatusCode(), @@ -219,16 +219,15 @@ void PrestoExchangeSource::doRequest( self->immediateBufferTransfer_ ? self->pool_.get() : nullptr))); } else if (response->hasError()) { - self->processDataError( - path, maxBytes, maxWaitSeconds, response->error()); + self->processDataError(path, maxBytes, maxWait, response->error()); } else { self->processDataResponse(std::move(response)); } }) .thenError( folly::tag_t{}, - [path, maxBytes, maxWaitSeconds, self](const std::exception& e) { - self->processDataError(path, maxBytes, maxWaitSeconds, e.what()); + [path, maxBytes, maxWait, self](const std::exception& e) { + self->processDataError(path, maxBytes, maxWait, e.what()); }); }; @@ -363,7 +362,7 @@ void PrestoExchangeSource::processDataResponse( void PrestoExchangeSource::processDataError( const std::string& path, uint32_t maxBytes, - uint32_t maxWaitSeconds, + std::chrono::microseconds maxWait, const std::string& error) { ++failedAttempts_; if (!dataRequestRetryState_.isExhausted()) { @@ -371,7 +370,7 @@ void PrestoExchangeSource::processDataError( << path << ", duration: " << dataRequestRetryState_.durationMs() << "ms - Retrying: " << error; - doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds); + doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWait); return; } diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index b3d45374344cc..33109f4f28715 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -130,11 +130,11 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { /// should not hold a lock over queue's mutex when making this call. folly::SemiFuture request( uint32_t maxBytes, - uint32_t maxWaitSeconds) override; + std::chrono::microseconds maxWait) override; folly::SemiFuture requestDataSizes( - uint32_t maxWaitSeconds) override { - return request(0, maxWaitSeconds); + std::chrono::microseconds maxWait) override { + return request(0, maxWait); } // Create an exchange source using pooled connections. @@ -198,7 +198,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { static void testingClearMemoryUsage(); private: - void doRequest(int64_t delayMs, uint32_t maxBytes, uint32_t maxWaitSeconds); + void doRequest( + int64_t delayMs, + uint32_t maxBytes, + std::chrono::microseconds maxWait); /// Handles successful, possibly empty, response. Adds received data to the /// queue. If received an end marker, notifies the queue by adding null page. @@ -218,7 +221,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { void processDataError( const std::string& path, uint32_t maxBytes, - uint32_t maxWaitSeconds, + std::chrono::microseconds maxWait, const std::string& error); void acknowledgeResults(int64_t ackSequence); diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp index 30539c7b7a46e..5f8cfc3242dee 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp @@ -33,7 +33,7 @@ std::optional getBroadcastInfo(folly::Uri& uri) { folly::SemiFuture BroadcastExchangeSource::request( uint32_t /*maxBytes*/, - uint32_t /*maxWaitSeconds*/) { + std::chrono::microseconds /*maxWait*/) { if (atEnd_) { return folly::makeFuture(Response{0, true}); } @@ -69,7 +69,8 @@ folly::F14FastMap BroadcastExchangeSource::stats() const { } folly::SemiFuture -BroadcastExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) { +BroadcastExchangeSource::requestDataSizes( + std::chrono::microseconds /*maxWait*/) { std::vector remainingBytes; if (!atEnd_) { // Use default value of ExchangeClient::getAveragePageSize() for now. diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h index ed6e5c32aee00..75ae8e0cbbcf6 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h @@ -39,10 +39,10 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource { folly::SemiFuture request( uint32_t maxBytes, - uint32_t maxWaitSeconds) override; + std::chrono::microseconds maxWait) override; folly::SemiFuture requestDataSizes( - uint32_t maxWaitSeconds) override; + std::chrono::microseconds maxWait) override; void close() override {} diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp index b40a7f647022e..9fb3a54ff7a3a 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp @@ -31,7 +31,7 @@ namespace facebook::presto::operators { folly::SemiFuture UnsafeRowExchangeSource::request( uint32_t /*maxBytes*/, - uint32_t /*maxWaitSeconds*/) { + std::chrono::microseconds /*maxWait*/) { auto nextBatch = [this]() { return std::move(shuffle_->next()) .deferValue([this](velox::BufferPtr buffer) { @@ -74,7 +74,8 @@ UnsafeRowExchangeSource::request( } folly::SemiFuture -UnsafeRowExchangeSource::requestDataSizes(uint32_t /*maxWaitSeconds*/) { +UnsafeRowExchangeSource::requestDataSizes( + std::chrono::microseconds /*maxWait*/) { std::vector remainingBytes; if (!atEnd_) { // Use default value of ExchangeClient::getAveragePageSize() for now. diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h index 91105d45032f1..d4c89d53b9dc4 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h @@ -36,10 +36,10 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { folly::SemiFuture request( uint32_t maxBytes, - uint32_t maxWaitSeconds) override; + std::chrono::microseconds maxWait) override; folly::SemiFuture requestDataSizes( - uint32_t maxWaitSeconds) override; + std::chrono::microseconds maxWait) override; void close() override { shuffle_->noMoreData(true); diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index bffb70deec439..5967dd09ccb56 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -453,7 +453,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { std::lock_guard l(queue->mutex()); ASSERT_TRUE(exchangeSource->shouldRequestLocked()); } - exchangeSource->request(1 << 20, 2); + exchangeSource->request(1 << 20, std::chrono::seconds(2)); } std::shared_ptr pool_; @@ -999,7 +999,7 @@ TEST_P(PrestoExchangeSourceTest, closeRaceCondition) { std::lock_guard l(queue->mutex()); ASSERT_TRUE(exchangeSource->shouldRequestLocked()); } - auto future = exchangeSource->request(1 << 20, 2); + auto future = exchangeSource->request(1 << 20, std::chrono::seconds(2)); ASSERT_TRUE(future.isReady()); auto response = std::move(future).get(); ASSERT_EQ(response.bytes, 0);