From ad9429ddd15c76245be9cb51793b4b56bc631698 Mon Sep 17 00:00:00 2001 From: Zuyu ZHANG Date: Wed, 27 Nov 2024 12:23:59 -0800 Subject: [PATCH] Revert "[native] Advance Velox and apply TaskState changes" This reverts commit 0ee4687d038c2f4dd8b513a1375a39ffd8bda1eb. --- .../presto_cpp/main/PeriodicTaskManager.cpp | 14 +++++-------- .../presto_cpp/main/PrestoTask.cpp | 10 +++++----- .../presto_cpp/main/TaskManager.cpp | 20 +++++++++---------- presto-native-execution/velox | 2 +- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index 0b1b800e5e248..2e8de3a5eccde 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -333,20 +333,16 @@ void PeriodicTaskManager::updateTaskStats() { RECORD_METRIC_VALUE( kCounterNumTasksBytesProcessed, taskManager_->getBytesProcessed()); RECORD_METRIC_VALUE( - kCounterNumTasksRunning, - taskNumbers[static_cast(velox::exec::TaskState::kRunning)]); + kCounterNumTasksRunning, taskNumbers[velox::exec::TaskState::kRunning]); RECORD_METRIC_VALUE( - kCounterNumTasksFinished, - taskNumbers[static_cast(velox::exec::TaskState::kFinished)]); + kCounterNumTasksFinished, taskNumbers[velox::exec::TaskState::kFinished]); RECORD_METRIC_VALUE( kCounterNumTasksCancelled, - taskNumbers[static_cast(velox::exec::TaskState::kCanceled)]); + taskNumbers[velox::exec::TaskState::kCanceled]); RECORD_METRIC_VALUE( - kCounterNumTasksAborted, - taskNumbers[static_cast(velox::exec::TaskState::kAborted)]); + kCounterNumTasksAborted, taskNumbers[velox::exec::TaskState::kAborted]); RECORD_METRIC_VALUE( - kCounterNumTasksFailed, - taskNumbers[static_cast(velox::exec::TaskState::kFailed)]); + kCounterNumTasksFailed, taskNumbers[velox::exec::TaskState::kFailed]); const auto driverCounts = taskManager_->getDriverCounts(); RECORD_METRIC_VALUE(kCounterNumQueuedDrivers, driverCounts.numQueuedDrivers); diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index dd970bf9fe43a..50850cdeb7634 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -40,15 +40,15 @@ namespace { protocol::TaskState toPrestoTaskState(exec::TaskState state) { switch (state) { - case exec::TaskState::kRunning: + case exec::kRunning: return protocol::TaskState::RUNNING; - case exec::TaskState::kFinished: + case exec::kFinished: return protocol::TaskState::FINISHED; - case exec::TaskState::kCanceled: + case exec::kCanceled: return protocol::TaskState::CANCELED; - case exec::TaskState::kFailed: + case exec::kFailed: return protocol::TaskState::FAILED; - case exec::TaskState::kAborted: + case exec::kAborted: [[fallthrough]]; default: return protocol::TaskState::ABORTED; diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 7dd3fed2eea67..afab868c95a26 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -629,7 +629,7 @@ std::unique_ptr TaskManager::deleteTask( auto execTask = prestoTask->task; if (execTask) { auto state = execTask->state(); - if (state == exec::TaskState::kRunning) { + if (state == exec::kRunning) { execTask->requestAbort(); } prestoTask->info.stats.endTime = @@ -881,13 +881,13 @@ folly::Future> TaskManager::getResults( for (;;) { if (prestoTask->taskStarted) { // If the task has finished, then send completion result. - if (prestoTask->task->state() == exec::TaskState::kFinished) { + if (prestoTask->task->state() == exec::kFinished) { promiseHolder->promise.setValue(createCompleteResult(token)); return std::move(future).via(httpSrvCpuExecutor_); } // If task is not running let the request timeout. The task may have // failed at creation time and the coordinator hasn't yet caught up. - if (prestoTask->task->state() == exec::TaskState::kRunning) { + if (prestoTask->task->state() == exec::kRunning) { getData( promiseHolder, folly::to_weak_ptr(state), @@ -1166,11 +1166,11 @@ int32_t TaskManager::yieldTasks( std::array TaskManager::getTaskNumbers(size_t& numTasks) const { std::array res{0}; - const auto taskMap = *taskMap_.rlock(); + auto taskMap = taskMap_.rlock(); numTasks = 0; - for (const auto& [_, task] : taskMap) { - if (task->task) { - ++res[static_cast(task->task->state())]; + for (const auto& pair : *taskMap) { + if (pair.second->task != nullptr) { + ++res[pair.second->task->state()]; ++numTasks; } } @@ -1180,8 +1180,8 @@ std::array TaskManager::getTaskNumbers(size_t& numTasks) const { int64_t TaskManager::getBytesProcessed() const { const auto taskMap = *taskMap_.rlock(); int64_t totalCount = 0; - for (const auto& [_, task] : taskMap) { - totalCount += task->info.stats.processedInputDataSizeInBytes; + for (const auto& pair : taskMap) { + totalCount += pair.second->info.stats.processedInputDataSizeInBytes; } return totalCount; } @@ -1190,7 +1190,7 @@ void TaskManager::shutdown() { size_t numTasks; auto taskNumbers = getTaskNumbers(numTasks); size_t seconds = 0; - while (taskNumbers[static_cast(exec::TaskState::kRunning)] > 0) { + while (taskNumbers[velox::exec::TaskState::kRunning] > 0) { PRESTO_SHUTDOWN_LOG(INFO) << "Waited (" << seconds << " seconds so far) for 'Running' tasks to complete. " << numTasks diff --git a/presto-native-execution/velox b/presto-native-execution/velox index e80bf12e3f5db..2b5e9f1a5c690 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit e80bf12e3f5db0af81793986b0707477027c4e80 +Subproject commit 2b5e9f1a5c6907d5b18b47d5aadbe4062bc42edf