From 169d270d9f8f5d4060990bd58ff1e864eb88cec9 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 27 Jun 2024 11:28:10 -0700 Subject: [PATCH 1/3] [Native] Advance Velox --- presto-native-execution/velox | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 29704a6585891..cf655ff01eb3d 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 29704a65858915a2fefeb6422d1b80a7e4bd06d2 +Subproject commit cf655ff01eb3d509f0760eff609a99f340167465 From 22cf57748f6b46b505c5e4e7dc8bdef52b9cbda9 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 27 Jun 2024 11:44:57 -0700 Subject: [PATCH 2/3] [native] Send explicit ack only when PrestoExchangeSource is paused No need to send an explicit ack when PrestoExchangeSource is active. A subsequent getData request is expected to acknowledge previously received data automatically --- .../presto_cpp/main/PrestoExchangeSource.cpp | 12 +++++++++--- .../presto_cpp/main/PrestoExchangeSource.h | 2 ++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index b1750b07c19b7..ffdeaece9c1d4 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -384,9 +384,6 @@ void PrestoExchangeSource::processDataResponse( if (complete) { abortResults(); - } else if (!empty) { - // Acknowledge results for non-empty content. - acknowledgeResults(ackSequence); } } @@ -422,6 +419,15 @@ void PrestoExchangeSource::processDataError( } } +void PrestoExchangeSource::pause() { + int64_t ackSequence; + { + std::lock_guard l(queue_->mutex()); + ackSequence = sequence_; + } + acknowledgeResults(ackSequence); +} + void PrestoExchangeSource::acknowledgeResults(int64_t ackSequence) { auto ackPath = fmt::format("{}/{}/acknowledge", basePath_, ackSequence); VLOG(1) << "Sending ack " << ackPath; diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index 69b4187f4a73c..4955f53be9999 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -106,6 +106,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { return request(0, maxWait); } + void pause() override; + // Create an exchange source using pooled connections. static std::shared_ptr create( const std::string& url, From efbb4ba3c553e684ae61315e8694b2db3fa6084a Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 27 Jun 2024 11:48:56 -0700 Subject: [PATCH 3/3] [native] Increase default request timeout for Exchange To accommodate long pool delay increase in Velox (2s -> 10s). The request timeout has to be higher than the long pool delay to let the remote server properly respond before terminating a connection. --- presto-native-execution/presto_cpp/main/common/Configs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 97f197b7bb951..7bb89736f5c0d 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -216,7 +216,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kAnnouncementMaxFrequencyMs, 30'000), // 30s NUM_PROP(kHeartbeatFrequencyMs, 0), STR_PROP(kExchangeMaxErrorDuration, "3m"), - STR_PROP(kExchangeRequestTimeout, "10s"), + STR_PROP(kExchangeRequestTimeout, "20s"), STR_PROP(kExchangeConnectTimeout, "20s"), BOOL_PROP(kExchangeEnableConnectionPool, true), BOOL_PROP(kExchangeEnableBufferCopy, true),