diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index b1750b07c19b7..ffdeaece9c1d4 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -384,9 +384,6 @@ void PrestoExchangeSource::processDataResponse( if (complete) { abortResults(); - } else if (!empty) { - // Acknowledge results for non-empty content. - acknowledgeResults(ackSequence); } } @@ -422,6 +419,15 @@ void PrestoExchangeSource::processDataError( } } +void PrestoExchangeSource::pause() { + int64_t ackSequence; + { + std::lock_guard l(queue_->mutex()); + ackSequence = sequence_; + } + acknowledgeResults(ackSequence); +} + void PrestoExchangeSource::acknowledgeResults(int64_t ackSequence) { auto ackPath = fmt::format("{}/{}/acknowledge", basePath_, ackSequence); VLOG(1) << "Sending ack " << ackPath; diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index 69b4187f4a73c..4955f53be9999 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -106,6 +106,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { return request(0, maxWait); } + void pause() override; + // Create an exchange source using pooled connections. static std::shared_ptr create( const std::string& url, diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 97f197b7bb951..7bb89736f5c0d 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -216,7 +216,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kAnnouncementMaxFrequencyMs, 30'000), // 30s NUM_PROP(kHeartbeatFrequencyMs, 0), STR_PROP(kExchangeMaxErrorDuration, "3m"), - STR_PROP(kExchangeRequestTimeout, "10s"), + STR_PROP(kExchangeRequestTimeout, "20s"), STR_PROP(kExchangeConnectTimeout, "20s"), BOOL_PROP(kExchangeEnableConnectionPool, true), BOOL_PROP(kExchangeEnableBufferCopy, true), diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 29704a6585891..cf655ff01eb3d 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 29704a65858915a2fefeb6422d1b80a7e4bd06d2 +Subproject commit cf655ff01eb3d509f0760eff609a99f340167465