diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index 2e8de3a5eccde..0b1b800e5e248 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -333,16 +333,20 @@ void PeriodicTaskManager::updateTaskStats() { RECORD_METRIC_VALUE( kCounterNumTasksBytesProcessed, taskManager_->getBytesProcessed()); RECORD_METRIC_VALUE( - kCounterNumTasksRunning, taskNumbers[velox::exec::TaskState::kRunning]); + kCounterNumTasksRunning, + taskNumbers[static_cast(velox::exec::TaskState::kRunning)]); RECORD_METRIC_VALUE( - kCounterNumTasksFinished, taskNumbers[velox::exec::TaskState::kFinished]); + kCounterNumTasksFinished, + taskNumbers[static_cast(velox::exec::TaskState::kFinished)]); RECORD_METRIC_VALUE( kCounterNumTasksCancelled, - taskNumbers[velox::exec::TaskState::kCanceled]); + taskNumbers[static_cast(velox::exec::TaskState::kCanceled)]); RECORD_METRIC_VALUE( - kCounterNumTasksAborted, taskNumbers[velox::exec::TaskState::kAborted]); + kCounterNumTasksAborted, + taskNumbers[static_cast(velox::exec::TaskState::kAborted)]); RECORD_METRIC_VALUE( - kCounterNumTasksFailed, taskNumbers[velox::exec::TaskState::kFailed]); + kCounterNumTasksFailed, + taskNumbers[static_cast(velox::exec::TaskState::kFailed)]); const auto driverCounts = taskManager_->getDriverCounts(); RECORD_METRIC_VALUE(kCounterNumQueuedDrivers, driverCounts.numQueuedDrivers); diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index 50850cdeb7634..dd970bf9fe43a 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -40,15 +40,15 @@ namespace { protocol::TaskState toPrestoTaskState(exec::TaskState state) { switch (state) { - case exec::kRunning: + case exec::TaskState::kRunning: return protocol::TaskState::RUNNING; - case exec::kFinished: + case exec::TaskState::kFinished: return protocol::TaskState::FINISHED; - case exec::kCanceled: + case exec::TaskState::kCanceled: return protocol::TaskState::CANCELED; - case exec::kFailed: + case exec::TaskState::kFailed: return protocol::TaskState::FAILED; - case exec::kAborted: + case exec::TaskState::kAborted: [[fallthrough]]; default: return protocol::TaskState::ABORTED; diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index afab868c95a26..7dd3fed2eea67 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -629,7 +629,7 @@ std::unique_ptr TaskManager::deleteTask( auto execTask = prestoTask->task; if (execTask) { auto state = execTask->state(); - if (state == exec::kRunning) { + if (state == exec::TaskState::kRunning) { execTask->requestAbort(); } prestoTask->info.stats.endTime = @@ -881,13 +881,13 @@ folly::Future> TaskManager::getResults( for (;;) { if (prestoTask->taskStarted) { // If the task has finished, then send completion result. - if (prestoTask->task->state() == exec::kFinished) { + if (prestoTask->task->state() == exec::TaskState::kFinished) { promiseHolder->promise.setValue(createCompleteResult(token)); return std::move(future).via(httpSrvCpuExecutor_); } // If task is not running let the request timeout. The task may have // failed at creation time and the coordinator hasn't yet caught up. - if (prestoTask->task->state() == exec::kRunning) { + if (prestoTask->task->state() == exec::TaskState::kRunning) { getData( promiseHolder, folly::to_weak_ptr(state), @@ -1166,11 +1166,11 @@ int32_t TaskManager::yieldTasks( std::array TaskManager::getTaskNumbers(size_t& numTasks) const { std::array res{0}; - auto taskMap = taskMap_.rlock(); + const auto taskMap = *taskMap_.rlock(); numTasks = 0; - for (const auto& pair : *taskMap) { - if (pair.second->task != nullptr) { - ++res[pair.second->task->state()]; + for (const auto& [_, task] : taskMap) { + if (task->task) { + ++res[static_cast(task->task->state())]; ++numTasks; } } @@ -1180,8 +1180,8 @@ std::array TaskManager::getTaskNumbers(size_t& numTasks) const { int64_t TaskManager::getBytesProcessed() const { const auto taskMap = *taskMap_.rlock(); int64_t totalCount = 0; - for (const auto& pair : taskMap) { - totalCount += pair.second->info.stats.processedInputDataSizeInBytes; + for (const auto& [_, task] : taskMap) { + totalCount += task->info.stats.processedInputDataSizeInBytes; } return totalCount; } @@ -1190,7 +1190,7 @@ void TaskManager::shutdown() { size_t numTasks; auto taskNumbers = getTaskNumbers(numTasks); size_t seconds = 0; - while (taskNumbers[velox::exec::TaskState::kRunning] > 0) { + while (taskNumbers[static_cast(exec::TaskState::kRunning)] > 0) { PRESTO_SHUTDOWN_LOG(INFO) << "Waited (" << seconds << " seconds so far) for 'Running' tasks to complete. " << numTasks diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 2b5e9f1a5c690..e80bf12e3f5db 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 2b5e9f1a5c6907d5b18b47d5aadbe4062bc42edf +Subproject commit e80bf12e3f5db0af81793986b0707477027c4e80